Skip to content

Commit

Permalink
Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206)
Browse files Browse the repository at this point in the history
* Fix NPE in KafkaSupervisor.checkpointTaskGroup

* address comments

* address comment
  • Loading branch information
jihoonson authored and gianm committed Aug 27, 2018
1 parent 0172326 commit bda5a8a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
*/
private class TaskGroup
{
final int groupId;

// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
// this task group has completed successfully, at which point this will be destroyed and a new task group will be
Expand All @@ -161,11 +163,13 @@ private class TaskGroup
final String baseSequenceName;

TaskGroup(
int groupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
this.groupId = groupId;
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
Expand All @@ -187,9 +191,21 @@ Set<String> taskIds()

private static class TaskData
{
@Nullable
volatile TaskStatus status;
@Nullable
volatile DateTime startTime;
volatile Map<Integer, Long> currentOffsets = new HashMap<>();

@Override
public String toString()
{
return "TaskData{" +
"status=" + status +
", startTime=" + startTime +
", currentOffsets=" + currentOffsets +
'}';
}
}

// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
Expand Down Expand Up @@ -718,8 +734,8 @@ public void handle() throws ExecutionException, InterruptedException
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
Expand Down Expand Up @@ -785,10 +801,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
if (partitionOffsetInMetadataStore != null ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue()))) {
final TaskGroup partitionTaskGroup = taskGroups.get(
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
);
final boolean isSameOffset = partitionTaskGroup != null
&& partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue());
if (partitionOffsetInMetadataStore != null || isSameOffset) {
doReset = true;
break;
}
Expand Down Expand Up @@ -1012,7 +1031,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti
List<String> futureTaskIds = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
List<Task> tasks = taskStorage.getActiveTasks();
final Set<Integer> taskGroupsToVerify = new HashSet<>();
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();

for (Task task : tasks) {
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
Expand Down Expand Up @@ -1119,6 +1138,7 @@ public Boolean apply(KafkaIndexTask.Status status)
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
),
Expand All @@ -1127,8 +1147,15 @@ public Boolean apply(KafkaIndexTask.Status status)
);
}
);
taskGroupsToVerify.add(taskGroupId);
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
taskGroupsToVerify.put(taskGroupId, taskGroup);
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskGroup != null) {
throw new ISE(
"WTH? a taskGroup[%s] already exists for new task[%s]",
prevTaskGroup,
taskId
);
}
}
}
return true;
Expand Down Expand Up @@ -1156,7 +1183,7 @@ public Boolean apply(KafkaIndexTask.Status status)
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);

// make sure the checkpoints are consistent with each other and with the metadata store
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
}

/**
Expand All @@ -1166,10 +1193,9 @@ public Boolean apply(KafkaIndexTask.Status status)
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published offsets.
*/
private void verifyAndMergeCheckpoints(final Integer groupId)
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

final int groupId = taskGroup.groupId;
// List<TaskId, Map -> {SequenceId, Checkpoints}>
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
Expand Down Expand Up @@ -1330,6 +1356,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(
groupId,
ImmutableMap.copyOf(startingPartitions),
Optional.absent(),
Optional.absent()
Expand Down Expand Up @@ -1367,8 +1394,8 @@ public Boolean apply(@Nullable DateTime startTime)
}

taskData.startTime = startTime;
long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
- taskData.startTime.getMillis());
long millisRemaining = ioConfig.getTaskDuration().getMillis() -
(System.currentTimeMillis() - taskData.startTime.getMillis());
if (millisRemaining > 0) {
scheduledExec.schedule(
buildRunTask(),
Expand Down Expand Up @@ -1421,7 +1448,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (earliestTaskStart.isAfter(taskData.startTime)) {
// startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
Expand All @@ -1430,7 +1458,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(groupId, true));
futures.add(checkpointTaskGroup(group, true));
}
}

Expand Down Expand Up @@ -1468,10 +1496,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
}
}

private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
Expand All @@ -1480,30 +1506,33 @@ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int group
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();

if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
// task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
if (task.status != null) {
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
// publishing failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
return null;
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
{
return null;
}
}
}
);
}
);
}

if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
}
}
}
}
Expand Down Expand Up @@ -1550,7 +1579,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());

if (setEndOffsetTaskIds.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand All @@ -1561,11 +1590,15 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
taskGroup.groupId
);
}

log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
log.info(
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
taskGroup.groupId,
endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
Expand All @@ -1587,7 +1620,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand Down Expand Up @@ -1627,11 +1660,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
continue;
}

Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
while (iTask.hasNext()) {
Map.Entry<String, TaskData> task = iTask.next();
final Entry<String, TaskData> entry = iTask.next();
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

if (task.getValue().status.isFailure()) {
if (taskData.status.isFailure()) {
iTask.remove(); // remove failed task
if (group.tasks.isEmpty()) {
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart
Expand All @@ -1640,10 +1677,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
}
}

if (task.getValue().status.isSuccess()) {
if (taskData.status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
// we no longer need them to publish their segment.
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
futures.add(stopTasksInGroup(group));
foundSuccess = true;
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
Expand Down Expand Up @@ -1714,6 +1751,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
continue;
}

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

// remove failed tasks
if (taskData.status.isFailure()) {
iTasks.remove();
Expand Down Expand Up @@ -1741,7 +1780,7 @@ void createNewTasks() throws JsonProcessingException
taskGroups.entrySet()
.stream()
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));

// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
Expand All @@ -1757,6 +1796,7 @@ void createNewTasks() throws JsonProcessingException
) : Optional.absent());

final TaskGroup taskGroup = new TaskGroup(
groupId,
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
Expand Down Expand Up @@ -1984,8 +2024,12 @@ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)

final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
if (!entry.getValue().status.isComplete()) {
futures.add(stopTask(entry.getKey(), false));
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
killTask(taskId);
} else if (!taskData.status.isComplete()) {
futures.add(stopTask(taskId, false));
}
}

Expand Down Expand Up @@ -2066,7 +2110,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (startTime != null) {
Expand All @@ -2093,7 +2137,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in
for (TaskGroup taskGroup : taskGroups) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (taskGroup.completionTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TaskReportData(
String id,
@Nullable Map<Integer, Long> startingOffsets,
@Nullable Map<Integer, Long> currentOffsets,
DateTime startTime,
@Nullable DateTime startTime,
Long remainingSeconds,
TaskType type,
@Nullable Map<Integer, Long> lag
Expand Down

0 comments on commit bda5a8a

Please sign in to comment.