Skip to content
Permalink
Browse files
Fix a race condition in the '/tasks' Overlord API (#12330)
* finds complete and active tasks from the same snapshot

* overlord resource

* unit test

* integration test

* javadoc and cleanup

* more cleanup

* fix test and add more
  • Loading branch information
jihoonson committed Mar 17, 2022
1 parent d745d0b commit 5e23674fe53339308ba1caaca48a80e7c9680da7
Show file tree
Hide file tree
Showing 23 changed files with 1,257 additions and 840 deletions.
@@ -25,6 +25,8 @@
*/
public enum RunnerTaskState
{
// Waiting tasks are not tracked.
// Instead, they are computed by (all tasks in metadata store - all tasks in taskRunner).
WAITING,
PENDING,
RUNNING,
@@ -77,4 +77,3 @@ public EntryType getTask()
return task;
}
}

@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

@@ -66,10 +67,10 @@ public static TaskStatus failure(String taskId, String errorMsg)
}

/**
* This method is deprecated because it does not handle the error message of failed task status properly.
* This method is deprecated for production because it does not handle the error message of failed task status properly.
* Use {@link #success(String)} or {@link #failure(String, String)} instead.
*/
@Deprecated
@VisibleForTesting
public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1, null, null);
@@ -48,7 +48,7 @@
public TaskStatusPlus(
String id,
@Nullable String groupId,
String type, // nullable for backward compatibility
@Nullable String type, // nullable for backward compatibility
DateTime createdTime,
DateTime queueInsertionTime,
@Nullable TaskState statusCode,
@@ -21,10 +21,12 @@

import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@@ -82,26 +84,28 @@ void insert(
TaskInfo<EntryType, StatusType> getTaskInfo(String entryId);

/**
* Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries
* created on or later than the given timestamp
* Returns a list of {@link TaskInfo} from metadata store that matches to the given filters.
*
* @param timestamp timestamp
* @param maxNumStatuses maxNumStatuses
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if there are more than one lookup is given.
*
* @return list of {@link TaskInfo}
* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
DateTime timestamp,
@Nullable Integer maxNumStatuses,
List<TaskInfo<EntryType, StatusType>> getTaskInfos(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
);

/**
* Return {@link TaskInfo} objects for all active entries
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource);
default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
TaskLookup taskLookup,
@Nullable String datasource
)
{
return getTaskInfos(Collections.singletonMap(taskLookup.getType(), taskLookup), datasource);
}

/**
* Add a lock to the given entry
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* Lookup types and parameters for task lookups in the metadata store.
*/
public interface TaskLookup
{
/**
* Task state in the metadata store.
* Complete tasks are the tasks that have been either succeeded or failed.
* Active tasks are the tasks that are not complete tasks.
*/
enum TaskLookupType
{
ACTIVE,
COMPLETE
}

TaskLookupType getType();

/**
* Task lookup for complete tasks. It includes optional filters for task lookups.
* When the filters are given, the task lookup returns only the tasks that satisfy all filters.
*/
class CompleteTaskLookup implements TaskLookup
{
/**
* Limits the number of taskStatuses to return.
*/
@Nullable
private final Integer maxTaskStatuses;

/**
* Returns only the tasks created prior to the given timestamp.
*/
@Nullable
private final DateTime tasksCreatedPriorTo;

public static CompleteTaskLookup of(
@Nullable Integer maxTaskStatuses,
@Nullable Duration durationBeforeNow
)
{
return new CompleteTaskLookup(
maxTaskStatuses,
durationBeforeNow == null ? null : computeTimestampPriorToNow(durationBeforeNow)
);
}

@VisibleForTesting
public static CompleteTaskLookup withTasksCreatedPriorTo(
@Nullable Integer maxTaskStatuses,
@Nullable DateTime tasksCreatedPriorTo
)
{
return new CompleteTaskLookup(maxTaskStatuses, tasksCreatedPriorTo);
}

private CompleteTaskLookup(
@Nullable Integer maxTaskStatuses,
@Nullable DateTime tasksCreatedPriorTo
)
{
this.maxTaskStatuses = maxTaskStatuses;
this.tasksCreatedPriorTo = tasksCreatedPriorTo;
}

public boolean hasTaskCreatedTimeFilter()
{
return tasksCreatedPriorTo != null;
}

public CompleteTaskLookup withDurationBeforeNow(Duration durationBeforeNow)
{
return CompleteTaskLookup.of(
maxTaskStatuses,
Preconditions.checkNotNull(durationBeforeNow, "durationBeforeNow")
);
}

private static DateTime computeTimestampPriorToNow(Duration durationBeforeNow)
{
return DateTimes
.nowUtc()
.minus(durationBeforeNow);
}

public DateTime getTasksCreatedPriorTo()
{
assert tasksCreatedPriorTo != null;
return tasksCreatedPriorTo;
}

@Nullable
public Integer getMaxTaskStatuses()
{
return maxTaskStatuses;
}

@Override
public TaskLookupType getType()
{
return TaskLookupType.COMPLETE;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompleteTaskLookup that = (CompleteTaskLookup) o;
return Objects.equals(maxTaskStatuses, that.maxTaskStatuses)
&& Objects.equals(tasksCreatedPriorTo, that.tasksCreatedPriorTo);
}

@Override
public int hashCode()
{
return Objects.hash(maxTaskStatuses, tasksCreatedPriorTo);
}
}

class ActiveTaskLookup implements TaskLookup
{
private static final ActiveTaskLookup INSTANCE = new ActiveTaskLookup();

public static ActiveTaskLookup getInstance()
{
return INSTANCE;
}

private ActiveTaskLookup()
{
}

@Override
public TaskLookupType getType()
{
return TaskLookupType.ACTIVE;
}

@Override
public int hashCode()
{
return 0;
}

@Override
public boolean equals(Object obj)
{
return obj instanceof ActiveTaskLookup;
}
}
}
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;

@RunWith(Enclosed.class)
public class TaskLookupTest
{
public static class CompleteTaskLookupTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(CompleteTaskLookup.class).usingGetClass().verify();
}

@Test
public void testGetType()
{
Assert.assertEquals(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null).getType());
}

@Test
public void testNullParams()
{
final CompleteTaskLookup lookup = CompleteTaskLookup.of(null, null);
Assert.assertNull(lookup.getMaxTaskStatuses());
Assert.assertFalse(lookup.hasTaskCreatedTimeFilter());
Assert.assertThrows(AssertionError.class, lookup::getTasksCreatedPriorTo);
}

@Test
public void testWithDurationBeforeNow()
{
final Duration duration = new Period("P1D").toStandardDuration();
final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration);
final CompleteTaskLookup lookup = CompleteTaskLookup
.of(null, null)
.withDurationBeforeNow(duration);
Assert.assertNull(lookup.getMaxTaskStatuses());
Assert.assertTrue(
timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo())
|| timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo())
);
}

@Test
public void testNonNullParams()
{
final Duration duration = new Period("P1D").toStandardDuration();
final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration);
final CompleteTaskLookup lookup = CompleteTaskLookup.of(3, duration);
Assert.assertNotNull(lookup.getMaxTaskStatuses());
Assert.assertEquals(3, lookup.getMaxTaskStatuses().intValue());
Assert.assertTrue(lookup.hasTaskCreatedTimeFilter());
Assert.assertTrue(
timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo())
|| timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo())
);
}
}

public static class ActiveTaskLookupTest
{
@Test
public void testSingleton()
{
final ActiveTaskLookup lookup1 = ActiveTaskLookup.getInstance();
final ActiveTaskLookup lookup2 = ActiveTaskLookup.getInstance();
Assert.assertEquals(lookup1, lookup2);
Assert.assertSame(lookup1, lookup2);
}

@Test
public void testGetType()
{
Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType());
}
}
}

0 comments on commit 5e23674

Please sign in to comment.