Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable segments read/published stats on Compaction task completion reports #15947

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3a945ce
Initial commit
adithyachakilam Feb 22, 2024
bd820af
Merge remote-tracking branch 'origin/master' into adithyachakilam/ena…
adithyachakilam Feb 26, 2024
fc18842
fix typo
adithyachakilam Feb 26, 2024
010e53d
fix missing coverage
adithyachakilam Feb 27, 2024
caa17e7
change to long
adithyachakilam Feb 27, 2024
44ebd78
fix tests
adithyachakilam Feb 28, 2024
d24f654
commit suggestions: Update indexing-service/src/main/java/org/apache/…
adithyachakilam Feb 28, 2024
e87a709
Merge remote-tracking branch 'origin/master' into adithyachakilam/ena…
adithyachakilam Mar 4, 2024
f8c8c73
address comments
adithyachakilam Mar 4, 2024
c1bd161
fix-compile-errors
adithyachakilam Mar 4, 2024
70ab762
license
adithyachakilam Mar 4, 2024
d318ff7
add IT Test
adithyachakilam Mar 4, 2024
7aeaa1d
add another test
adithyachakilam Mar 4, 2024
11c0ead
add test coverage
adithyachakilam Mar 5, 2024
7e3482e
remove throws exception
adithyachakilam Mar 5, 2024
6b2afe4
missed checkin
adithyachakilam Mar 5, 2024
7312217
checkstyle fix
adithyachakilam Mar 5, 2024
84dec87
address comments
adithyachakilam Mar 5, 2024
f4a0f0a
fix intellij annotation check
adithyachakilam Mar 5, 2024
c5225e0
comments
adithyachakilam Mar 5, 2024
907854e
remove separeate class
adithyachakilam Mar 5, 2024
02798ca
minor edits
adithyachakilam Mar 6, 2024
cabbc72
comments
adithyachakilam Mar 6, 2024
b32acf1
Merge remote-tracking branch 'origin/master' into adithyachakilam/ena…
adithyachakilam Mar 6, 2024
6296d56
fix unwanted deleteions
adithyachakilam Mar 6, 2024
98ec4b7
remove unwanted changes
adithyachakilam Mar 6, 2024
2336a2a
Merge remote-tracking branch 'origin/master' into adithyachakilam/ena…
adithyachakilam Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the newly ingested segments
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|


#### Compaction Task Fields

| Field | Description |
|---------------------|-------------------------------------------------------|
| `segmentsRead` | # of segments read by a parallel compaction task |
| `segmentsPublished` | # of segments published by a parallel compaction task |
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved

### Live report

When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
package org.apache.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.IngestionState;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "kind", defaultImpl = IngestionStatsAndErrorsTaskReport.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "ingestion_stats", value = IngestionStatsAndErrorsTaskReport.class),
@JsonSubTypes.Type(name = "parallel_compaction_stats", value = ParallelCompactionTaskReportData.class)
})
public class IngestionStatsAndErrorsTaskReportData
{
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.IngestionState;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;

public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTaskReportData
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
@JsonProperty
private Long segmentsRead;
@JsonProperty
private Long segmentsPublished;

public ParallelCompactionTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
@JsonProperty("segmentsRead") Long segmentsRead,
@JsonProperty("segmentsPublished") Long segmentsPublished
)
{
super(
ingestionState,
unparseableEvents,
rowStats,
errorMsg,
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
recordsProcessed
);
this.segmentsRead = segmentsRead;
this.segmentsPublished = segmentsPublished;
}

@JsonProperty
@Nullable
public Long getSegmentsRead()
{
return segmentsRead;
}

@JsonProperty
@Nullable
public Long getSegmentsPublished()
{
return segmentsPublished;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParallelCompactionTaskReportData that = (ParallelCompactionTaskReportData) o;
return super.equals(o) &&
Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
}

@Override
public int hashCode()
{
return Objects.hash(
super.hashCode(),
getSegmentsRead(),
getSegmentsPublished()
);
}

@Override
public String toString()
{
return "ParallelCompactionTaskReportData {" +
"IngestionStatsAndErrorsTaskReportData=" + super.toString() +
", segmentsRead=" + segmentsRead +
", segmentsPublished=" + segmentsPublished +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport
@JsonProperty("taskReport") Map<String, TaskReport> taskReport,
@JsonProperty("segmentsRead") Long segmentsRead
)
{
super(taskId, partitionStats, taskReport);
super(taskId, partitionStats, taskReport, segmentsRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,11 +38,19 @@ public class GeneratedPartitionsReport implements SubTaskReport
private final List<PartitionStat> partitionStats;
private final Map<String, TaskReport> taskReport;

GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, Map<String, TaskReport> taskReport)
private final Long segmentsRead;

GeneratedPartitionsReport(
String taskId,
List<PartitionStat> partitionStats,
Map<String, TaskReport> taskReport,
Long segmentsRead
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
)
{
this.taskId = taskId;
this.partitionStats = partitionStats;
this.taskReport = taskReport;
this.segmentsRead = segmentsRead;
}

@Override
Expand All @@ -63,6 +72,13 @@ public List<PartitionStat> getPartitionStats()
return partitionStats;
}

@JsonProperty
@Nullable
public Long getSegmentsRead()
{
return segmentsRead;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -75,13 +91,14 @@ public boolean equals(Object o)
GeneratedPartitionsReport that = (GeneratedPartitionsReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(partitionStats, that.partitionStats) &&
Objects.equals(taskReport, that.taskReport);
Objects.equals(taskReport, that.taskReport) &&
Objects.equals(segmentsRead, that.segmentsRead);
}

@Override
public int hashCode()
{
return Objects.hash(taskId, partitionStats, taskReport);
return Objects.hash(taskId, partitionStats, taskReport, segmentsRead);
}

@Override
Expand All @@ -91,6 +108,7 @@ public String toString()
"taskId='" + taskId + '\'' +
", partitionStats=" + partitionStats +
", taskReport=" + taskReport +
", segmentsRead=" + segmentsRead +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.ParallelCompactionTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -204,6 +205,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private final Boolean isCompactionTask;
private Long segmentsRead;
private Long segmentsPublished;


@JsonCreator
Expand Down Expand Up @@ -651,6 +654,14 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
segmentsRead = parallelSinglePhaseRunner.getReports()
.values()
.stream()
.mapToLong(report -> report.getOldSegments().size()).sum();
segmentsPublished = parallelSinglePhaseRunner.getReports()
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
.values()
.stream()
.mapToLong(report -> report.getNewSegments().size()).sum();
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
Expand Down Expand Up @@ -822,7 +833,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
TaskStatus taskStatus;
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, mergeRunner.getReports());
segmentsPublished = publishSegments(toolbox, mergeRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(mergeRunner.getReports());
}
Expand Down Expand Up @@ -920,7 +931,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
TaskState mergeState = runNextPhase(mergeRunner);
TaskStatus taskStatus;
if (mergeState.isSuccess()) {
publishSegments(toolbox, mergeRunner.getReports());
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
segmentsPublished = publishSegments(toolbox, mergeRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(mergeRunner.getReports());
}
Expand Down Expand Up @@ -1132,7 +1143,7 @@ private static Pair<Integer, Integer> getPartitionBoundaries(int index, int tota
return Pair.of(start, stop);
}

private void publishSegments(
private long publishSegments(
TaskToolbox toolbox,
Map<String, PushedSegmentsReport> reportsMap
)
Expand Down Expand Up @@ -1200,6 +1211,8 @@ private void publishSegments(
} else {
throw new ISE("Failed to publish segments");
}

return newSegments.size();
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
}

private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
Expand Down Expand Up @@ -1248,6 +1261,18 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
isCompactionTask ?
new ParallelCompactionTaskReportData(
IngestionState.COMPLETED,
rowStatsAndUnparseableEvents.rhs,
rowStatsAndUnparseableEvents.lhs,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap(),
segmentsRead,
segmentsPublished
) :
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
rowStatsAndUnparseableEvents.rhs,
Expand Down Expand Up @@ -1633,6 +1658,7 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
Expand All @@ -1643,6 +1669,9 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
if (generatedPartitionsReport.getSegmentsRead() != null) {
totalSegmentsRead += generatedPartitionsReport.getSegmentsRead();
}
}

RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
Expand All @@ -1651,6 +1680,9 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
if (totalSegmentsRead > 0) {
segmentsRead = totalSegmentsRead;
}

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,17 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(
TaskToolbox toolbox,
List<DataSegment> segments,
Map<String, TaskReport> taskReport,
Long segmentsRead
)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,16 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(
TaskToolbox toolbox,
List<DataSegment> segments,
Map<String, TaskReport> taskReport,
Long segmentsRead
)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead);
}
}