Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reasoning for choosing shardSpec to the MSQ report #16175

Merged
merged 7 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/api-reference/sql-ingestion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ The response shows an example report for a query.
},
"pendingTasks": 0,
"runningTasks": 2,
"segmentLoadStatus": {
"segmentLoadWaiterStatus": {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
"state": "SUCCESS",
"dataSource": "kttm_simple",
"startTime": "2022-09-14T23:12:09.266Z",
Expand All @@ -310,6 +310,10 @@ The response shows an example report for a query.
"onDemandSegments": 0,
"pendingSegments": 0,
"unknownSegments": 0
},
"segmentReport": {
"shardSpec": "NumberedShardSpec",
"details": "Cannot use RangeShardSpec, RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec instead."
}
},
"stages": [
Expand Down Expand Up @@ -631,6 +635,9 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.segmentLoadStatus.onDemandSegments` | The number of segments which are not loaded on any historical, as per the load rules. |
| `multiStageQuery.payload.status.segmentLoadStatus.pendingSegments` | The number of segments remaining to be loaded. |
| `multiStageQuery.payload.status.segmentLoadStatus.unknownSegments` | The number of segments whose status is unknown. |
| `multiStageQuery.payload.status.segmentReport` | Segment report. Only present if the query is an ingestion. |
| `multiStageQuery.payload.status.segmentReport.shardSpec` | Contains the shard spec chosen. |
| `multiStageQuery.payload.status.segmentReport.details` | Contains further reasoning about the shard spec chosen. |
| `multiStageQuery.payload.status.errorReport` | Error object. Only present if there was an error. |
| `multiStageQuery.payload.status.errorReport.taskId` | The task that reported the error, if known. May be a controller task or a worker task. |
| `multiStageQuery.payload.status.errorReport.host` | The hostname and port of the task that reported the error, if known. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
Expand Down Expand Up @@ -311,6 +312,8 @@ public class ControllerImpl implements Controller
private final boolean isFaultToleranceEnabled;
private final boolean isFailOnEmptyInsertEnabled;
private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
@Nullable
private MSQSegmentReport segmentReport;

public ControllerImpl(
final MSQControllerTask task,
Expand Down Expand Up @@ -563,7 +566,8 @@ public TaskStatus runTask(final Closer closer)
queryStartTime,
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
segmentLoadWaiter
segmentLoadWaiter,
segmentReport
),
stagesReport,
countersSnapshot,
Expand Down Expand Up @@ -933,7 +937,8 @@ public Map<String, TaskReport> liveReports()
queryStartTime,
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
segmentLoadWaiter
segmentLoadWaiter,
segmentReport
),
makeStageReport(
queryDef,
Expand Down Expand Up @@ -1097,12 +1102,16 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForReplace(
final SegmentIdWithShardSpec[] retVal = new SegmentIdWithShardSpec[partitionBoundaries.size()];
final Granularity segmentGranularity = destination.getSegmentGranularity();
final List<String> shardColumns;

if (mayHaveMultiValuedClusterByFields) {
// DimensionRangeShardSpec cannot handle multi-valued fields.
shardColumns = Collections.emptyList();
final Pair<List<String>, String> shardReasonPair;

shardReasonPair = computeShardColumns(signature, clusterBy, task.getQuerySpec().getColumnMappings(), mayHaveMultiValuedClusterByFields);
shardColumns = shardReasonPair.lhs;
String reason = shardReasonPair.rhs;
log.info(StringUtils.format("ShardSpec chosen: %s", reason));
if (shardColumns.isEmpty()) {
segmentReport = new MSQSegmentReport(NumberedShardSpec.class.getSimpleName(), reason);
} else {
shardColumns = computeShardColumns(signature, clusterBy, task.getQuerySpec().getColumnMappings());
segmentReport = new MSQSegmentReport(DimensionRangeShardSpec.class.getSimpleName(), reason);
}

// Group partition ranges by bucket (time chunk), so we can generate shardSpecs for each bucket independently.
Expand Down Expand Up @@ -2036,19 +2045,24 @@ private static boolean isTimeBucketedIngestion(final MSQSpec querySpec)
* Compute shard columns for {@link DimensionRangeShardSpec}. Returns an empty list if range-based sharding
* is not applicable.
*/
private static List<String> computeShardColumns(
private static Pair<List<String>, String> computeShardColumns(
final RowSignature signature,
final ClusterBy clusterBy,
final ColumnMappings columnMappings
final ColumnMappings columnMappings,
boolean mayHaveMultiValuedClusterByFields
)
{
if (mayHaveMultiValuedClusterByFields) {
// DimensionRangeShardSpec cannot handle multi-valued fields.
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the fields in the CLUSTER BY clause contains a multivalues. Using NumberedShardSpec instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grammar
Also, if its possible to pinpoint the multiValue fields without much refactoring, then we can mention that here.

Suggested change
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the fields in the CLUSTER BY clause contains a multivalues. Using NumberedShardSpec instead.");
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the fields in the CLUSTERED BY clause contains multivalues in column [%s]. Using NumberedShardSpec instead.");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have the column name at this point, we only store a boolean mayContainMultivalues. Updated the message a bit

}
final List<KeyColumn> clusterByColumns = clusterBy.getColumns();
final List<String> shardColumns = new ArrayList<>();
final boolean boosted = isClusterByBoosted(clusterBy);
final int numShardColumns = clusterByColumns.size() - clusterBy.getBucketByCount() - (boosted ? 1 : 0);

if (numShardColumns == 0) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, as there are no shardColumns. Using NumberedShardSpec instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the user doesn't supply the clustered by. In that case, the reason doesn't seem necessary, or it can be reworded.

Suggested change
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, as there are no shardColumns. Using NumberedShardSpec instead.");
return Pair.of(Collections.emptyList(), "Using NumberedShardSpec as no columns are supplied in the 'CLUSTERED BY' clause.");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

}

for (int i = clusterBy.getBucketByCount(); i < clusterBy.getBucketByCount() + numShardColumns; i++) {
Expand All @@ -2057,25 +2071,25 @@ private static List<String> computeShardColumns(

// DimensionRangeShardSpec only handles ascending order.
if (column.order() != KeyOrder.ASCENDING) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, RangedShardSpec only supports ascending CLUSTER BY keys. Using NumberedShardSpec instead.");
}

ColumnType columnType = signature.getColumnType(column.columnName()).orElse(null);

// DimensionRangeShardSpec only handles strings.
if (!(ColumnType.STRING.equals(columnType))) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec instead.");
}

// DimensionRangeShardSpec only handles columns that appear as-is in the output.
if (outputColumns.isEmpty()) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, RangeShardSpec only supports columns that appear as-is in the output. Using NumberedShardSpec instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does as-is mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the message to "Could not find output column name for column [%s]" to include the column name. I'm not sure what conditions would cause the output column to not be found here.

}

shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
}

return shardColumns;
return Pair.of(shardColumns, "Using RangeShardSpec to generate segments.");
}

/**
Expand Down Expand Up @@ -2351,7 +2365,8 @@ private static MSQStatusReport makeStatusReport(
@Nullable final DateTime queryStartTime,
final long queryDuration,
MSQWorkerTaskLauncher taskLauncher,
final SegmentLoadStatusFetcher segmentLoadWaiter
final SegmentLoadStatusFetcher segmentLoadWaiter,
@Nullable MSQSegmentReport msqSegmentReport
)
{
int pendingTasks = -1;
Expand All @@ -2376,7 +2391,8 @@ private static MSQStatusReport makeStatusReport(
workerStatsMap,
pendingTasks,
runningTasks,
status
status,
msqSegmentReport
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.report;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class MSQSegmentReport
{
String shardSpec;
String details;

@JsonCreator
public MSQSegmentReport(@JsonProperty("shardSpec") String shardSpec, @JsonProperty("details") String reason)
{
this.shardSpec = shardSpec;
this.details = reason;
}

@JsonProperty
public String getShardSpec()
{
return shardSpec;
}

@JsonProperty
public String getDetails()
{
return details;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class MSQStatusReport
@Nullable
private final SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus;

@Nullable
private final MSQSegmentReport segmentReport;

@JsonCreator
public MSQStatusReport(
@JsonProperty("status") TaskState status,
Expand All @@ -69,7 +72,8 @@ public MSQStatusReport(
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus,
@JsonProperty("segmentReport") @Nullable MSQSegmentReport segmentReport
)
{
this.status = Preconditions.checkNotNull(status, "status");
Expand All @@ -81,6 +85,7 @@ public MSQStatusReport(
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
this.segmentReport = segmentReport;
}

@JsonProperty
Expand Down Expand Up @@ -144,6 +149,14 @@ public SegmentLoadStatusFetcher.SegmentLoadWaiterStatus getSegmentLoadWaiterStat
return segmentLoadWaiterStatus;
}

@JsonProperty("segmentReport")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public MSQSegmentReport getSegmentReport()
{
return segmentReport;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testSerdeResultsReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testSerdeErrorReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testWriteTaskReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new HashMap<>(),
1,
2,
null,
null
),
MSQStagesReport.create(
Expand Down Expand Up @@ -314,6 +315,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new HashMap<>(),
1,
2,
null,
null
),
MSQStagesReport.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testDistinctPartitionsOnEachWorker()
new HashMap<>(),
1,
2,
null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
Expand Down Expand Up @@ -109,6 +110,7 @@ public void testOnePartitionOnEachWorker()
new HashMap<>(),
1,
2,
null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
Expand Down Expand Up @@ -149,6 +151,7 @@ public void testCommonPartitionsOnEachWorker()
new HashMap<>(),
1,
2,
null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
Expand Down Expand Up @@ -187,6 +190,7 @@ public void testNullChannelCounters()
new HashMap<>(),
1,
2,
null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
Expand Down Expand Up @@ -226,6 +230,7 @@ public void testConsecutivePartitionsOnEachWorker()
new HashMap<>(),
1,
2,
null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
Expand Down Expand Up @@ -265,6 +270,7 @@ public void testEmptyCountersForDurableStorageDestination()
new HashMap<>(),
1,
2,
null,
null
),
MSQStagesReport.create(
Expand Down Expand Up @@ -301,6 +307,7 @@ public void testEmptyCountersForTaskReportDestination()
new HashMap<>(),
1,
2,
null,
null
),
MSQStagesReport.create(
Expand Down Expand Up @@ -339,6 +346,7 @@ public void testEmptyCountersForDataSourceDestination()
new HashMap<>(),
1,
2,
null,
null
),
MSQStagesReport.create(
Expand Down