Skip to content

Commit

Permalink
Fix two issues with Coordinator -> Overlord communication. (#7412)
Browse files Browse the repository at this point in the history
* Fix two issues with Coordinator -> Overlord communication.

1) ClientCompactQuery needs to recognize the potential for 'intervals'
to be set instead of 'segments'. The lack of this led to a
NullPointerException on DruidCoordinatorSegmentCompactor.java:102.

2) In two locations (DruidCoordinatorSegmentCompactor,
DruidCoordinatorCleanupPendingSegments) tasks were being retrieved
using waiting/pending/running tasks in the wrong order: by checking
'running' first and then 'pending', tasks could be missed if they
moved from 'pending' to 'running' in between the two calls. Replaced
these methods with calls to 'getActiveTasks', a new method that does
the calls in the right order.

* Remove unused import.
  • Loading branch information
gianm authored and fjy committed Apr 4, 2019
1 parent d29a320 commit 78745fe
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final List<DataSegment> segments;
private final Interval interval;
private final boolean keepSegmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
Expand All @@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
Expand All @@ -49,6 +53,7 @@ public ClientCompactQuery(
{
this.dataSource = dataSource;
this.segments = segments;
this.interval = interval;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
Expand All @@ -75,6 +80,12 @@ public List<DataSegment> getSegments()
return segments;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@JsonProperty
public boolean isKeepSegmentGranularity()
{
Expand All @@ -100,12 +111,46 @@ public Map<String, Object> getContext()
return context;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientCompactQuery that = (ClientCompactQuery) o;
return keepSegmentGranularity == that.keepSegmentGranularity &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(segments, that.segments) &&
Objects.equals(interval, that.interval) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
}

@Override
public int hashCode()
{
return Objects.hash(
dataSource,
segments,
interval,
keepSegmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context
);
}

@Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", interval=" + interval +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
Expand All @@ -40,10 +41,13 @@
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class HttpIndexingServiceClient implements IndexingServiceClient
{
Expand Down Expand Up @@ -90,6 +94,7 @@ public String compactSegments(
return runTask(
new ClientCompactQuery(
dataSource,
null,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
Expand Down Expand Up @@ -195,21 +200,30 @@ public int getTotalWorkerCapacity()
}

@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return getTasks("runningTasks");
}
// Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
// calls then we still catch them. (Tasks always go waiting -> pending -> running.)
//
// Consider switching to new-style /druid/indexer/v1/tasks API in the future.
final List<TaskStatusPlus> tasks = new ArrayList<>();
final Set<String> taskIdsSeen = new HashSet<>();

final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
getTasks("waitingTasks"),
getTasks("pendingTasks"),
getTasks("runningTasks")
);

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return getTasks("pendingTasks");
}
for (TaskStatusPlus task : activeTasks) {
// Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
// for example, and we see it twice.)
if (taskIdsSeen.add(task.getId())) {
tasks.add(task);
}
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return getTasks("waitingTasks");
return tasks;
}

private List<TaskStatusPlus> getTasks(String endpointSuffix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ String compactSegments(

String killTask(String taskId);

List<TaskStatusPlus> getRunningTasks();

List<TaskStatusPlus> getPendingTasks();

List<TaskStatusPlus> getWaitingTasks();
/**
* Gets all tasks that are waiting, pending, or running.
*/
List<TaskStatusPlus> getActiveTasks();

TaskStatusResponse getTaskStatus(String taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final List<DateTime> createdTimes = new ArrayList<>();
createdTimes.add(
indexingServiceClient
.getRunningTasks()
.getActiveTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getPendingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getWaitingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time.
.orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
);

final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,11 +83,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
indexingServiceClient.getRunningTasks(),
indexingServiceClient.getPendingTasks(),
indexingServiceClient.getWaitingTasks()
);
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
for (TaskStatusPlus status : compactTasks) {
Expand All @@ -98,13 +93,22 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
final Interval interval;

if (compactQuery.getSegments() != null) {
interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
} else if (compactQuery.getInterval() != null) {
interval = compactQuery.getInterval();
} else {
throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
}

compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
Expand Down Expand Up @@ -146,13 +150,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
.build();
}

@SafeVarargs
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
{
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);

return allTaskStatusPlus
return taskStatuses
.stream()
.filter(status -> {
final String taskType = status.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,21 +75,9 @@ public String killTask(String taskId)
}

@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return null;
}

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return null;
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return null;
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,7 @@ public String compactSegments(
}

@Override
public List<TaskStatusPlus> getRunningTasks()
{
return Collections.emptyList();
}

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return Collections.emptyList();
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return Collections.emptyList();
}
Expand Down

0 comments on commit 78745fe

Please sign in to comment.