Skip to content

Commit

Permalink
Add stats segmentsRead and segmentsPublished to compaction task repor…
Browse files Browse the repository at this point in the history
…ts (#15947)

Changes:
- Add visibility into number of segments read/published by each parallel compaction
- Add new fields `segmentsRead`, `segmentsPublished` to `IngestionStatsAndErrorsTaskReportData`
- Update `ParallelIndexSupervisorTask` to populate the new stats
  • Loading branch information
adithyachakilam committed Mar 7, 2024
1 parent ebf3bdd commit 564c44e
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 14 deletions.
8 changes: 8 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the newly ingested segments
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|


#### Compaction task segment info fields

|Field|Description|
|---|---|
|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.|
|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.|

### Live report

When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getReportKey()
}

@Override
public Object getPayload()
public IngestionStatsAndErrorsTaskReportData getPayload()
{
return payload;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.IngestionState;

Expand Down Expand Up @@ -50,14 +51,21 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private Map<String, Long> recordsProcessed;

@JsonProperty
private Long segmentsRead;
@JsonProperty
private Long segmentsPublished;

public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
@Nullable @JsonProperty("segmentsRead") Long segmentsRead,
@Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
)
{
this.ingestionState = ingestionState;
Expand All @@ -67,6 +75,8 @@ public IngestionStatsAndErrorsTaskReportData(
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
this.segmentsRead = segmentsRead;
this.segmentsPublished = segmentsPublished;
}

@JsonProperty
Expand Down Expand Up @@ -113,6 +123,22 @@ public Map<String, Long> getRecordsProcessed()
return recordsProcessed;
}

@JsonProperty
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsRead()
{
return segmentsRead;
}

@JsonProperty
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsPublished()
{
return segmentsPublished;
}

public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
Expand All @@ -137,7 +163,9 @@ public boolean equals(Object o)
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) &&
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) &&
Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
}

@Override
Expand All @@ -150,7 +178,9 @@ public int hashCode()
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs(),
getRecordsProcessed()
getRecordsProcessed(),
getSegmentsRead(),
getSegmentsPublished()
);
}

Expand All @@ -165,6 +195,8 @@ public String toString()
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
", segmentsRead=" + segmentsRead +
", segmentsPublished=" + segmentsPublished +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
errorMsg == null,
0L,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen

private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private Long segmentsRead;
private Long segmentsPublished;
private final boolean isCompactionTask;


Expand Down Expand Up @@ -643,6 +645,14 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (isCompactionTask) {
// Populate segmentsRead only for compaction tasks
segmentsRead = parallelSinglePhaseRunner.getReports()
.values()
.stream()
.mapToLong(report -> report.getOldSegments().size()).sum();
}

if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
Expand Down Expand Up @@ -1189,6 +1199,8 @@ private void publishSegments(
} else {
throw new ISE("Failed to publish segments");
}

segmentsPublished = (long) newSegments.size();
}

private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
Expand Down Expand Up @@ -1245,7 +1257,9 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
segmentsPublished
)
)
);
Expand Down Expand Up @@ -1629,6 +1643,7 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
Expand All @@ -1639,6 +1654,13 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);

Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
).getPayload().getSegmentsRead();
if (segmentsReadFromPartition != null) {
totalSegmentsRead += segmentsReadFromPartition;
}
}

RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
Expand All @@ -1647,6 +1669,9 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
if (totalSegmentsRead > 0) {
segmentsRead = totalSegmentsRead;
}

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand Down Expand Up @@ -125,7 +127,7 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception
toolbox.getIndexingTmpDir()
);

Map<String, TaskReport> taskReport = getTaskCompletionReports();
Map<String, TaskReport> taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource));

taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport));

Expand All @@ -149,6 +151,18 @@ abstract T createGeneratedPartitionsReport(
Map<String, TaskReport> taskReport
);

private Long getNumSegmentsRead(InputSource inputSource)
{
if (inputSource instanceof DruidInputSource) {
List<WindowedSegmentId> segments = ((DruidInputSource) inputSource).getSegmentIds();
if (segments != null) {
return (long) segments.size();
}
}

return null;
}

private List<DataSegment> generateSegments(
final TaskToolbox toolbox,
final ParallelIndexSupervisorTaskClient taskClient,
Expand Down Expand Up @@ -236,7 +250,7 @@ private List<DataSegment> generateSegments(
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private Map<String, TaskReport> getTaskCompletionReports()
private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
Expand All @@ -248,7 +262,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,9 @@ private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorM
errorMsg,
errorMsg == null,
handoffWaitMs,
getPartitionStats()
getPartitionStats(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc

List<IngestionStatsAndErrorsTaskReportData> reports = getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task

// this test reads 3 segments and publishes 6 segments
Assert.assertEquals(
3,
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
);
Assert.assertEquals(
6,
reports.stream()
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
.sum()
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void testSerde() throws Exception
"an error message",
true,
1000L,
ImmutableMap.of("PartitionA", 5000L)
ImmutableMap.of("PartitionA", 5000L),
5L,
10L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
Expand Down Expand Up @@ -127,6 +129,8 @@ public void testSerializationOnMissingPartitionStats() throws Exception
"an error message",
true,
1000L,
null,
null,
null
)
);
Expand Down

0 comments on commit 564c44e

Please sign in to comment.