Skip to content
Permalink
Browse files
Display row stats for multiphase parallel indexing tasks (#12280)
Row stats are reported for single phase tasks in the `/liveReports` and `/rowStats` APIs
and are also a part of the overall task report. This commit adds changes to report
row stats for multiphase tasks too.

Changes:
- Add `TaskReport` in `GeneratedPartitionsReport` generated during hash and range partitioning
- Collect the reports for `index_generate` phase in `ParallelIndexSupervisorTask`
  • Loading branch information
tejaswini-imply committed Mar 2, 2022
1 parent 50038d9 commit 1af4c9c933cd62804d106c9a2974dc0b3cf75780
Show file tree
Hide file tree
Showing 14 changed files with 575 additions and 228 deletions.
@@ -21,8 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;

import java.util.List;
import java.util.Map;

/**
* Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is
@@ -35,9 +37,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
@JsonCreator
GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats
@JsonProperty("partitionStats") List<PartitionStat> partitionStats,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport
)
{
super(taskId, partitionStats);
super(taskId, partitionStats, taskReport);
}
}
@@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
@@ -33,11 +35,13 @@ public class GeneratedPartitionsReport implements SubTaskReport
{
private final String taskId;
private final List<PartitionStat> partitionStats;
private final Map<String, TaskReport> taskReport;

GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats)
GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, Map<String, TaskReport> taskReport)
{
this.taskId = taskId;
this.partitionStats = partitionStats;
this.taskReport = taskReport;
}

@Override
@@ -47,6 +51,12 @@ public String getTaskId()
return taskId;
}

@JsonProperty
public Map<String, TaskReport> getTaskReport()
{
return taskReport;
}

@JsonProperty
public List<PartitionStat> getPartitionStats()
{
@@ -64,13 +74,14 @@ public boolean equals(Object o)
}
GeneratedPartitionsReport that = (GeneratedPartitionsReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(partitionStats, that.partitionStats);
Objects.equals(partitionStats, that.partitionStats) &&
Objects.equals(taskReport, that.taskReport);
}

@Override
public int hashCode()
{
return Objects.hash(taskId, partitionStats);
return Objects.hash(taskId, partitionStats, taskReport);
}

@Override
@@ -79,6 +90,7 @@ public String toString()
return "GeneratedPartitionsReport{" +
"taskId='" + taskId + '\'' +
", partitionStats=" + partitionStats +
", taskReport=" + taskReport +
'}';
}
}
@@ -66,6 +66,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.MutableRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@@ -176,6 +177,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@MonotonicNonNull
private volatile TaskToolbox toolbox;

@MonotonicNonNull
private Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;

private IngestionState ingestionState;

@JsonCreator
@@ -726,6 +730,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
);
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
@@ -814,6 +819,8 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
return TaskStatus.failure(getId(), errMsg);
}

indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// partition (interval, partitionId) -> partition locations
Map<Partition, List<PartitionLocation>> partitionToLocations =
getPartitionToLocations(indexingRunner.getReports());
@@ -1477,10 +1484,7 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
boolean includeUnparseable
)
{
long processed = 0L;
long processedWithError = 0L;
long thrownAway = 0L;
long unparseable = 0L;
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();

List<ParseExceptionReport> unparseableEvents = new ArrayList<>();

@@ -1492,35 +1496,75 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
continue;
}
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
RowIngestionMetersTotals rowIngestionMetersTotals =
getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}

RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
parallelSinglePhaseRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}

if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
ParallelIndexTaskRunner<?, ?> currentRunner,
boolean includeUnparseable
)
{
if (indexGenerateRowStats != null) {
return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
} else if (!currentRunner.getName().equals("partial segment generation")) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
} else {
Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();

final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowStatsForCompletedTask =
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
}

processed += totals.getProcessed();
processedWithError += totals.getProcessedWithError();
thrownAway += totals.getThrownAway();
unparseable += totals.getUnparseable();
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
currentRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
}

// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
Set<String> runningTaskIds,
List<ParseExceptionReport> unparseableEvents,
boolean includeUnparseable
)
{
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
Map<String, Object> report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
if (report == null || report.isEmpty()) {
// task does not have a running report yet
continue;
}

Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
@@ -1529,33 +1573,53 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>) taskUnparseableEvents.get(
RowIngestionMeters.BUILD_SEGMENTS
);
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}

processed += ((Number) buildSegments.get("processed")).longValue();
processedWithError += ((Number) buildSegments.get("processedWithError")).longValue();
thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
unparseable += ((Number) buildSegments.get("unparseable")).longValue();
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
}
catch (Exception e) {
LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
}
}
return buildSegmentsRowStats.getTotals();
}

private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
RowIngestionMetersTotals rowStats,
List<ParseExceptionReport> unparseableEvents
)
{
Map<String, Object> rowStatsMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable)
);
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
rowStatsMap.put("totals", totalsMap);

return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
}

private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
Map<String, TaskReport> taskReport,
boolean includeUnparseable,
List<ParseExceptionReport> unparseableEvents)
{
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
}
return totals;
}

private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
{
if (currentSubTaskHolder == null) {
@@ -1569,8 +1633,10 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

if (isParallelMode()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
// multiphase is not supported yet
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
(ParallelIndexTaskRunner<?, ?>) currentRunner,
includeUnparseable
);
} else {
return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
(SinglePhaseParallelIndexTaskRunner) currentRunner,
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -161,12 +162,12 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}

/**
@@ -25,6 +25,7 @@
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -175,11 +176,11 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}
}

0 comments on commit 1af4c9c

Please sign in to comment.