Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window function on msq #15470

Merged
merged 54 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
4ef900d
Initial code
somu-imply Nov 14, 2023
5e8dab1
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Nov 17, 2023
9ea01fc
Hacky way of atleast getting things to work
somu-imply Nov 30, 2023
9c4ac74
Temp unfinished changes
somu-imply Dec 1, 2023
54f9ac3
Converting rac back to frames
somu-imply Dec 7, 2023
d6cef47
Working UTs
somu-imply Dec 7, 2023
40a18f1
Fixing running window function in console
somu-imply Dec 8, 2023
3ecc96a
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Dec 31, 2023
7e34aa8
Adding shuffle spec and separating out stages for each window
somu-imply Jan 3, 2024
f5a1f59
serde stuff by adding ops to proc factory
somu-imply Jan 8, 2024
ab6e317
Updating for first set of reviews
somu-imply Jan 9, 2024
f1efec3
Changes for partition boundary detection
somu-imply Jan 16, 2024
1dae450
cleaning up some code, adding some tests
somu-imply Jan 19, 2024
ccfe473
Fixing up shuffle in group by if window afterwards
somu-imply Jan 19, 2024
500f54f
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 20, 2024
98f4ba5
fix after merge
somu-imply Jan 20, 2024
ee61333
More updates and ignoring the insert cases FOR NOW..
somu-imply Jan 20, 2024
8f8bfdc
A possible fix for the insert case
somu-imply Jan 20, 2024
ec1f164
Support for leaf operators in window functions in MSQ
somu-imply Jan 23, 2024
465598a
in case of MSQ engine planning the query with leafOperator as a windo…
somu-imply Jan 24, 2024
d490d78
Removing inspection profile
somu-imply Jan 24, 2024
836b693
Revert "Removing inspection profile"
somu-imply Jan 24, 2024
aee40ba
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 24, 2024
3f3d1b0
Updating inspection profile
somu-imply Jan 24, 2024
aa1b753
Updating scan query kit to handle shuffle for next window
somu-imply Jan 24, 2024
634d5bc
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 27, 2024
bd5f27b
Some comments addressed
somu-imply Jan 29, 2024
5338b76
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 29, 2024
b688755
Throwing exceptions in 2 cases
somu-imply Jan 29, 2024
ceff35d
Moving examples to a new file and adding new examples with join
somu-imply Feb 1, 2024
fc2e2b6
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 1, 2024
751e947
More tests now window functions over unnest
somu-imply Feb 1, 2024
d7840a3
Addressing review commets part 1
somu-imply Feb 2, 2024
681ee1b
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 5, 2024
b487d2a
Changes to frame processor to move from base leaf to frame processor …
somu-imply Feb 5, 2024
c3baa1d
Some more refactoring and addressing comments
somu-imply Feb 5, 2024
9768da2
Updating more tests
somu-imply Feb 6, 2024
584fa8f
Making build pass
somu-imply Feb 6, 2024
0813cc8
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 14, 2024
d3755e1
Addressing latest set of comments part 1
somu-imply Mar 14, 2024
c0c74a0
Addressing latest review comments part 2
somu-imply Mar 14, 2024
60c6290
Minor refactoring and new tests after review
somu-imply Mar 20, 2024
91edf9e
Adding guardrails for materialization and avoiding partition bossting…
somu-imply Mar 20, 2024
399a78c
Adding more javadocs, guardrails and tests
somu-imply Mar 21, 2024
806c801
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 21, 2024
04424d7
Changes to one test case
somu-imply Mar 21, 2024
f0946b1
More fixes around guardrails and addressing last set of review comments
somu-imply Mar 27, 2024
ea7882c
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
83c96b9
Fixing a testcase after the merge
somu-imply Mar 27, 2024
cfca6a5
Fixing a test by using correct in filters for sql compat mode
somu-imply Mar 27, 2024
c3e2c29
Not documenting context flag and 1 more test change
somu-imply Mar 27, 2024
1464dae
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
520ab4e
New test for inner limit on group by
somu-imply Mar 28, 2024
16b75ce
Adding to known issues
somu-imply Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
Expand All @@ -184,6 +185,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -1172,6 +1174,7 @@ private QueryKit makeQueryControllerToolKit()
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.build();

return new MultiQueryKit(kitMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.List;
import java.util.function.Function;

public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
{
private final ReadableInput baseInput;
private final ResourceHolder<WritableFrameChannel> outputChannelHolder;
Expand All @@ -63,6 +63,7 @@ protected BaseLeafFrameProcessor(
@Override
public List<ReadableFrameChannel> inputChannels()
{
// somu: need to clarify if the data is in broker only
if (baseInput.hasSegment()) {
return Collections.emptyList();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.SegmentToRowsAndColumnsOperator;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
{

private static final Logger log = new Logger(ScanQueryFrameProcessor.class);
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
private final WindowOperatorQuery query;
private final ObjectMapper jsonMapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the object mapper be declared static ?

private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private final VirtualColumns frameWriterVirtualColumns;
private final Closer closer = Closer.create();

private Cursor cursor;
private Segment segment;
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE);
private FrameWriter frameWriter;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed

public WindowOperatorQueryFrameProcessor(
final WindowOperatorQuery query,
final ObjectMapper jsonMapper,
final ReadableInput baseInput,
final Function<SegmentReference, SegmentReference> segmentMapFn,
final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder
)
{
super(
baseInput,
segmentMapFn,
outputChannelHolder,
frameWriterFactoryHolder
);
this.query = query;
this.jsonMapper = jsonMapper;
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);

final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);

final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);

if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
}

this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns);
}
// deep storage
@Override
protected ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment) throws IOException
{
return null;
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
}

// realtime
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor segment) throws IOException
{
return null;
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
}

// previous stage output
@Override
protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChannel, FrameReader inputFrameReader)
throws IOException
{
// Read the frames from the channel
// convert to FrameRowsAndColumns

if (inputChannel.canRead()) {
Frame f = inputChannel.read();
final FrameSegment frameSegment = new FrameSegment(f, inputFrameReader, SegmentId.dummy("x"));
Fixed Show fixed Hide fixed

// the frame here is row based
// frame rows and columns need columnar. discuss with Eric
// Action item: need to implement a new rows and columns that accept a row-based frame
RowBasedFrameRowAndColumns frameRowsAndColumns = new RowBasedFrameRowAndColumns(f, inputFrameReader.signature());
LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(frameRowsAndColumns, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null);
// Create an operator on top of the created rows and columns
Operator op = new Operator()
{
@Nullable
@Override
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
receiver.push(ldrc);
receiver.completed();
return continuationObject;
}
};
//
//Operator op = new SegmentToRowsAndColumnsOperator(frameSegment);
// On the operator created above add the operators present in the query that we want to chain

for ( OperatorFactory of : query.getOperators()) {
op = of.wrap(op);
}

// Let the operator run
// the results that come in the receiver must be pushed to the outout channel
// need to transform the output rows and columns back to frame
Operator.go(op, new Operator.Receiver()
{
@Override
public Operator.Signal push(RowsAndColumns rac)
{
//outputFrameChannel.output(rac.toFrame());
return Operator.Signal.GO;
}

@Override
public void completed()
{

}
});

} else if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
return ReturnOrAwait.runAgain();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.segment.SegmentReference;

import java.util.function.Function;

@JsonTypeName("window")
public class WindowOperatorQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactory
{
private final WindowOperatorQuery query;

@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(@JsonProperty("query") WindowOperatorQuery query)
{
super(query);
this.query = Preconditions.checkNotNull(query, "query");
}

@JsonProperty
public WindowOperatorQuery getQuery()
{
return query;
}

@Override
protected FrameProcessor<Object> makeProcessor(
ReadableInput baseInput,
Function<SegmentReference, SegmentReference> segmentMapFn,
ResourceHolder<WritableFrameChannel> outputChannelHolder,
ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
FrameContext frameContext
)
{
return new WindowOperatorQueryFrameProcessor(
query,
frameContext.jsonMapper(),
baseInput,
segmentMapFn,
outputChannelHolder,
frameWriterFactoryHolder
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.query.Query;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.segment.column.RowSignature;

public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
private final ObjectMapper jsonMapper;

public WindowOperatorQueryKit(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public QueryDefinition makeQueryDefinition(
String queryId,
WindowOperatorQuery originalQuery,
QueryKit<Query<?>> queryKit,
ShuffleSpecFactory resultShuffleSpecFactory,
int maxWorkerCount,
int minStageNumber
)
{
// need to validate query first

final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId);
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
minStageNumber,
false
);


dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);

final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
RowSignature rowSignature = queryToRun.getRowSignature();

// Create a new stage which takes in the subquery as an input
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber)
.inputs(new StageInputSpec(firstStageNumber-1))
.signature(rowSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(null)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the shuffle spec is null. Tell the previous stage to shuffle by the appropriate partition here so that the data comes correctly. For example if previous stage is a groupByPostShuffle, find a way to tell it to set a shuffle spec for the next stage. Since the inner query has no idea of the outer operators, we can use the context to pass the information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle spec for a stage tells it how to partition the data for the next stage. Therefore it should use a combination of the resultShuffleSpecFactory to construct the final shuffleSpec.
If you want the data in a particular format inside a stage, its input should always be a stage, and the shuffle spec of that stage should be set accordingly. Hash Shuffle uses similar logic.

.processorFactory(new WindowOperatorQueryFrameProcessorFactory(queryToRun))
);
return queryDefBuilder.queryId(queryId).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,23 @@ public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs) throw
}

final int fullRowSize = query.getResultRowSignature().size();
rowSupplierFromFrameCursor = () -> {
final ResultRow row = ResultRow.create(fullRowSize);
for (int i = 0; i < fieldSuppliers.length; i++) {
row.set(i, fieldSuppliers[i].get());
rowSupplierFromFrameCursor = new Supplier<ResultRow>()
{
@Override
public ResultRow get()
{
final ResultRow row = ResultRow.create(fullRowSize);
for (int i = 0; i < fieldSuppliers.length; i++) {
row.set(i, fieldSuppliers[i].get());
}

for (int i = fieldSuppliers.length; i < fullRowSize; i++) {
// Post-aggregators.
row.set(i, null);
}

return row;
}

for (int i = fieldSuppliers.length; i < fullRowSize; i++) {
// Post-aggregators.
row.set(i, null);
}

return row;
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery scanQu
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(final SegmentWithDescriptor segment) throws IOException
{
// Try to run with the segment on the server
// if segment was not found for some reason
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
// call runWithSegment which would load from deep storage
if (cursor == null) {
ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
final Pair<LoadedSegmentDataProvider.DataServerQueryStatus, Yielder<Object[]>> statusSequencePair =
Expand Down Expand Up @@ -244,7 +247,9 @@ protected ReturnOrAwait<Unit> runWithLoadedSegment(final SegmentWithDescriptor s
@Override
protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
// why not remove order by and limit here ?
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
if (cursor == null) {
// load from deep storage
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());

final Yielder<Cursor> cursorYielder = Yielders.each(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case TOPN_QUERY:
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case ALLOW_TOP_LEVEL_UNION_ALL:
return false;
case WINDOW_FUNCTIONS:
case UNNEST:
case CAN_SELECT:
case CAN_INSERT:
Expand Down