Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable final stages to MSQ ingestion queries #16699

Merged
merged 19 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
Expand Down Expand Up @@ -1193,8 +1194,35 @@ private Int2ObjectMap<Object> makeWorkerFactoryInfosForStage(
{
if (MSQControllerTask.isIngestion(querySpec) &&
stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) {
// noinspection unchecked,rawtypes
return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.doesSegmentMorphing()) {
// noinspection unchecked,rawtypes
return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
} else {
// worker info is the new lock version
if (destination.getReplaceTimeChunks().size() != 1) {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
throw new ISE(
"Must have single interval in replaceTimeChunks, but got[%s]",
destination.getReplaceTimeChunks()
);
}
try {
final List<TaskLock> locks;
locks = context.taskActionClient().submit(new LockListAction());
if (locks.size() == 1) {
final Int2ObjectMap<Object> retVal = new Int2ObjectAVLTreeMap<>();
for (int worker : workerInputs.workers()) {
retVal.put(worker, locks.get(0).getVersion());
}
return retVal;
} else {
throw new ISE("Got number of locks other than one: [%s]", locks);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
return null;
}
Expand Down Expand Up @@ -1810,6 +1838,35 @@ private static QueryDefinition makeQueryDefinition(
}
}

// Possibly add a segment morpher stage.
if (((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) {
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
final FrameProcessorFactory segmentMorphFactory = destination.getSegmentMorphFactory();

if (!destination.isReplaceTimeChunks()) {
throw new MSQException(UnknownFault.forMessage("segmentMorphFactory requires replaceTimeChunks"));
}

builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(
new TableInputSpec(
destination.getDataSource(),
destination.getReplaceTimeChunks(),
null,
null
),
new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())
)
.broadcastInputs(IntSet.of(1))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.processorFactory(segmentMorphFactory)
);

// If there was a segment morpher, return immediately; don't add a segment-generation stage.
return builder.build();
}

// Then, add a segment-generation stage.
final DataSchema dataSchema =
makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
Expand Down Expand Up @@ -2723,7 +2780,8 @@ private void startStages() throws IOException, InterruptedException
for (final StageId stageId : newStageIds) {
// Allocate segments, if this is the final stage of an ingestion.
if (MSQControllerTask.isIngestion(querySpec)
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()) {
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()
&& !((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) {
populateSegmentsToGenerate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.apache.druid.msq.indexing.destination;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
Expand All @@ -49,18 +51,24 @@ public class DataSourceMSQDestination implements MSQDestination
@Nullable
private final List<Interval> replaceTimeChunks;

@Nullable
@SuppressWarnings("rawtypes")
private final FrameProcessorFactory segmentMorphFactory;

@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("segmentMorphFactory") @Nullable FrameProcessorFactory segmentMorphFactory
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.segmentMorphFactory = segmentMorphFactory;

if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
Expand Down Expand Up @@ -98,6 +106,30 @@ public String getDataSource()
return dataSource;
}

/**
* Returns the segment morph factory, if one is present, else null.
* <p>
* The segment morph factory if present, is a way to tell the MSQ task to funnel the results at the final stage to
* the {@link FrameProcessorFactory} instead of a segment generation stage.
*/
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public FrameProcessorFactory getSegmentMorphFactory()
{
return segmentMorphFactory;
}

/**
* Checks if the destination uses a segmentMorphFactory. If one is present, that means that the query would modify
* existing segments instead of generating new ones.
*/
@JsonIgnore
public boolean doesSegmentMorphing()
{
return segmentMorphFactory != null;
}

@JsonProperty
public Granularity getSegmentGranularity()
{
Expand Down Expand Up @@ -158,13 +190,14 @@ public boolean equals(Object o)
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(segmentMorphFactory, that.segmentMorphFactory);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, segmentMorphFactory);
}

@Override
Expand All @@ -175,6 +208,7 @@ public String toString()
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
(segmentMorphFactory != null ? ", segmentMorphFactory=" + segmentMorphFactory : "") +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
targetDataSource.getDestinationName(),
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
replaceTimeChunks,
null
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testIngestWithSanitizedNullByte() throws IOException
new ColumnMapping("v1", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
Expand Down Expand Up @@ -318,7 +318,7 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce
new ColumnMapping("agent_category", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(runtimeContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class MSQControllerTaskTest
"target",
Granularities.DAY,
null,
INTERVALS
INTERVALS,
null
))
.query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase
"test",
Granularities.DAY,
null,
null,
null
))
.tuningConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public void testEmptyCountersForDataSourceDestination()
"test",
Granularities.DAY,
null,
null,
null
)
);
Expand Down
Loading