Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE in KafkaSupervisor.checkpointTaskGroup #6206

Merged
merged 4 commits into from
Aug 27, 2018

Conversation

jihoonson
Copy link
Contributor

Hopefully fixes #6021.

TaskData.status and TaskData.startTime can be null if the supervisor is stopped gracefully before processing any runNotice which sets them properly.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to me 👍

taskGroupsToVerify.put(taskGroupId, taskGroup);
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskGroup != null) {
throw new ISE(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be very surprising if this happened? Enough to stop the supervisor run (i.e.: probable bug)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen and even if it occurs, the supervisor would kill the task of corresponding taskId and respawn the same task. Please check https://github.com/apache/incubator-druid/pull/6206/files/c46d4681c334709caa5cddbd0ce0c67a7d22eaad#diff-6eee87b3aa4eb3a516965fe6e93e25a4L1138.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
// task.status can be null if any runNotice is processed before kafkaSupervisor is stopped gracefully.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment right? It sounds like it would probably be the other way around (stop gracefully happens before run notices are processed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed.

}
} else {
log.info("Killing task [%s] of unknown status", taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to kill the task in this case -- I thought it could only happen for a supervisor that is stopping gracefully? Maybe we should just ignore the task, and log a warning, rather than killing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the perspective of checkpointing, if I understand this code correctly, the supervisor is checkpointing because one of tasks in a taskGroup has processed all assigned events, so all tasks in the taskGroup can be stopped or killed.

I'm not sure why this code is called when stopping the supervisor though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we shouldn't checkpoint while stopping the supervisor, and this would be a different issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm maybe it makes sense to checkpoint because the supervisor should wait for tasks to finish their jobs and they should be able to checkpoint in the middle of indexing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we don't want to stop/kill all tasks in the taskGroup just because one of them has processed all assigned events. It could be a checkpoint for an incremental handoff, and we want all tasks to continue running even after the checkpoint. Am I understanding this right?

In other words, it sounds to me like we want to stop/kill all other tasks if any of them has finished (status = success) but we don't want to stop/kill them if it was an incremental handoff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to say one more thing. This code is called only when tasks are running more than taskDuration. I don't know what's the idea behind doing checkpoint per taskDuration, but it expects to stop/kill all running tasks. See https://github.com/apache/incubator-druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java#L1433.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the code more closely and now I see that the idea is that at taskDuration, tasks should do a final publish and exit. So that's what finalize is for. The checkpointTaskGroup function, when finalize is true, will check if any task completed, and if so, stop all its replicas. This makes sense, since there is no point in replicas continuing to run if some task in the group is done. (Because they are all doing the same work.)

With your patch, checkpointTaskGroup, when finalize is true, will now kill any task that has null status. I don't see why this is a good thing. After the taskDuration is over, we want to trigger a final checkpoint/publish, and then let all tasks in a group keep running until one of them is successful. Killing one with unknown status seems counter-productive to that goal.

Am I wrong -- is there a reason it's a good idea to kill tasks with unknown status in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. I thought it makes sense to kill them because the supervisor is currently killing running tasks if they are not allocated to middleManagers yet.

Maybe it makes more sense to keep them because the unknown task status indicates that the supervisor hasn't updated it yet. I'll fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. I thought it makes sense to kill them because the supervisor is currently killing running tasks if they are not allocated to middleManagers yet.

I think killing unassigned running tasks does make sense, since if a task hasn't even started running yet, it has no hope of catching up so we should just cancel it. However, if there is some risk that the task actually is running but the supervisor just doesn't know where yet, this killing might be over-eager. If that's the case I think it'd be an issue for a separate PR though.

final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();

Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this get thrown and what will happen when it gets thrown? I'm wondering what the user experience is going to be like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen. NPE would throw if taskData.status is null and this is just a sanity code to see what's null on the potential NPE. I improved the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@@ -1714,6 +1755,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
continue;
}

Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question here -- when will this get thrown and what will happen when it gets thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This should never happen. NPE would throw if taskData.status is null and this is just a sanity code to see what's null on the potential NPE. I improved the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
killTask(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is killing the task the right idea here? If we don't know its status is it safe to leave it alone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null taskData.status means that the supervisor hasn't updated it yet, so its actual status can be anything. I think this should kill tasks if their status are unknown because this method is supposed to stop all tasks in the given taskGroup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that sounds good.

@gianm
Copy link
Contributor

gianm commented Aug 26, 2018

I checked out the TeamCity failures, they are all fixed in #6236 and are not related to this patch. The Travis failure looked spurious so I retried it.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after Travis. I think we can ignore TeamCity for this one, since the inspections it flagged are not related to this patch, and should be addressed in #6236.

@gianm
Copy link
Contributor

gianm commented Aug 27, 2018

Merged master into this branch to get the fixes from #6236. Let's see how this goes.

@gianm
Copy link
Contributor

gianm commented Aug 27, 2018

Seeing these, possibly legitimate failures?

Failed tests: 
  KafkaSupervisorTest.testCheckpointForInactiveTaskGroup:2126 java.lang.AssertionError: 
  Unexpected method call TaskRunner.getRunningTasks():
	at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:44)
	at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:94)
	at com.sun.proxy.$Proxy43.getRunningTasks(Unknown Source)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$1.getTaskLocation(KafkaSupervisor.java:312)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.checkpointTaskGroup(KafkaSupervisor.java:1531)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.checkTaskDuration(KafkaSupervisor.java:1461)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.runInternal(KafkaSupervisor.java:898)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$RunNotice.handle(KafkaSupervisor.java:611)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$2.run(KafkaSupervisor.java:385)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
 expected null, but was:<java.lang.AssertionError: 
  Unexpected method call TaskRunner.getRunningTasks():
	at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:44)
	at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:94)
	at com.sun.proxy.$Proxy43.getRunningTasks(Unknown Source)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$1.getTaskLocation(KafkaSupervisor.java:312)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.checkpointTaskGroup(KafkaSupervisor.java:1531)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.checkTaskDuration(KafkaSupervisor.java:1461)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor.runInternal(KafkaSupervisor.java:898)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$RunNotice.handle(KafkaSupervisor.java:611)
	at io.druid.indexing.kafka.supervisor.KafkaSupervisor$2.run(KafkaSupervisor.java:385)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@jihoonson
Copy link
Contributor Author

@gianm #6207 should fix it.

@gianm
Copy link
Contributor

gianm commented Aug 27, 2018

@jihoonson Got it, could you please merge master into this branch in order to get that?

@gianm
Copy link
Contributor

gianm commented Aug 27, 2018

Oh wait, it passed anyway. I guess #6207 isn't required.

@gianm gianm merged commit bda5a8a into apache:master Aug 27, 2018
gianm pushed a commit to gianm/druid that referenced this pull request Aug 27, 2018
* Fix NPE in KafkaSupervisor.checkpointTaskGroup

* address comments

* address comment
gianm pushed a commit to implydata/druid-public that referenced this pull request Aug 27, 2018
* Fix NPE in KafkaSupervisor.checkpointTaskGroup

* address comments

* address comment
fjy pushed a commit that referenced this pull request Aug 27, 2018
* Fix NPE in KafkaSupervisor.checkpointTaskGroup

* address comments

* address comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NPE in KafkaSupervisor.checkpointTaskGroup
4 participants