Skip to content

Commit

Permalink
Add Sort Operator for Window Functions (#13619)
Browse files Browse the repository at this point in the history
* Addition of NaiveSortMaker and Default implementation

Add the NaiveSortMaker which makes a sorter
object and a default implementation of the
interface.

This also allows us to plan multiple different window 
definitions on the same query.
  • Loading branch information
imply-cheddar committed Jan 6, 2023
1 parent 4ee4d99 commit f1821a7
Show file tree
Hide file tree
Showing 33 changed files with 2,550 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public class MSQWarningsTest extends MSQTestBase
@Before
public void setUp3() throws IOException
{
toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");
File tempFile = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");

// Rename the file and the file's extension from .tmp to .gz to prevent issues with 'parsing' the file
toRead = new File(tempFile.getParentFile(), "unparseable.gz");
tempFile.renameTo(toRead);
toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());

rowSignature = RowSignature.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,17 @@
* Base test runner for running MSQ unit tests. It sets up multi stage query execution environment
* and populates data for the datasources. The runner does not go via the HTTP layer for communication between the
* various MSQ processes.
*
* <p>
* Controller -> Coordinator (Coordinator is mocked)
*
* <p>
* In the Ut's we go from:
* {@link MSQTaskQueryMaker} -> {@link MSQTestOverlordServiceClient} -> {@link Controller}
*
*
* <p>
* <p>
* Controller -> Worker communication happens in {@link MSQTestControllerContext}
*
* <p>
* Worker -> Controller communication happens in {@link MSQTestControllerClient}
*
* <p>
* Controller -> Overlord communication happens in {@link MSQTestTaskActionClient}
*/
public class MSQTestBase extends BaseCalciteQueryTest
Expand Down Expand Up @@ -258,7 +258,8 @@ public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);

builder.addModule(new DruidModule() {
builder.addModule(new DruidModule()
{

// Small subset of MsqSqlModule
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.operator;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class ColumnWithDirection
{
public static ColumnWithDirection ascending(String column)
{
return new ColumnWithDirection(column, Direction.ASC);
}

public static ColumnWithDirection descending(String column)
{
return new ColumnWithDirection(column, Direction.DESC);
}

public enum Direction
{
ASC(1),
DESC(-1);

private final int directionInt;

Direction(int directionInt)
{
this.directionInt = directionInt;
}

public int getDirectionInt()
{
return directionInt;
}
}

private final String columnName;
private final Direction direction;

@JsonCreator
public ColumnWithDirection(
@JsonProperty("column") String columnName,
@JsonProperty("direction") Direction direction
)
{
this.columnName = columnName;
this.direction = direction;
}

@JsonProperty("column")
public String getColumn()
{
return columnName;
}

@JsonProperty("direction")
public Direction getDirection()
{
return direction;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof ColumnWithDirection)) {
return false;
}
ColumnWithDirection that = (ColumnWithDirection) o;
return Objects.equals(columnName, that.columnName) && direction == that.direction;
}

@Override
public int hashCode()
{
return Objects.hash(columnName, direction);
}

@Override
public String toString()
{
return "ColumnWithDirection{" +
"columnName='" + columnName + '\'' +
", direction=" + direction +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.druid.query.operator;

import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -60,9 +60,9 @@ public void go(Receiver receiver)
@Override
public boolean push(RowsAndColumns rac)
{
SortedGroupPartitioner groupPartitioner = rac.as(SortedGroupPartitioner.class);
ClusteredGroupPartitioner groupPartitioner = rac.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(rac);
groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
}

partitionsIter = groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.operator;

import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;

import java.util.ArrayList;

/**
* A naive sort operator is an operation that sorts a stream of data in-place. Generally speaking this means
* that it has to accumulate all of the data of its child operator first before it can sort. This limitation
* means that hopefully this operator is only planned in a very small number of circumstances.
*/
public class NaiveSortOperator implements Operator
{
private final Operator child;
private final ArrayList<ColumnWithDirection> sortColumns;

public NaiveSortOperator(
Operator child,
ArrayList<ColumnWithDirection> sortColumns
)
{
this.child = child;
this.sortColumns = sortColumns;
}

@Override
public void go(Receiver receiver)
{
child.go(
new Receiver()
{
NaiveSortMaker.NaiveSorter sorter = null;

@Override
public boolean push(RowsAndColumns rac)
{
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
} else {
sorter.moreData(rac);
}
return true;
}

@Override
public void completed()
{
receiver.push(sorter.complete());
receiver.completed();
}
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.operator;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;

public class NaiveSortOperatorFactory implements OperatorFactory
{
private final ArrayList<ColumnWithDirection> sortColumns;

@JsonCreator
public NaiveSortOperatorFactory(
@JsonProperty("columns") ArrayList<ColumnWithDirection> sortColumns
)
{
this.sortColumns = sortColumns;
}

@JsonProperty("columns")
public ArrayList<ColumnWithDirection> getSortColumns()
{
return sortColumns;
}

@Override
public Operator wrap(Operator op)
{
return new NaiveSortOperator(op, sortColumns);
}

@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof NaiveSortOperatorFactory) {
return sortColumns.equals(((NaiveSortOperatorFactory) other).getSortColumns());
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ interface Receiver
{
/**
* Used to push data. Return value indicates if more data will be accepted. If false, push should not
* be called anymore.
* be called anymore. If push is called after it returned false, undefined things will happen.
*
* @param rac {@link RowsAndColumns} of data
* @return a boolean value indicating if more data will be accepted. If false, push should never be called
Expand All @@ -62,7 +62,12 @@ interface Receiver
boolean push(RowsAndColumns rac);

/**
* Used to indicate that no more data will ever come
* Used to indicate that no more data will ever come. This is only used during the happy path and is not
* equivalent to a {@link java.io.Closeable#close()} method. Namely, there is no guarantee that this method
* will be called if execution halts due to an exception from push.
*
* It is acceptable for an implementation to eagerly close resources from this method, but it is not acceptable
* for this method to be the sole method of managing the lifecycle of resources held by the Operator
*/
void completed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "naivePartition", value = NaivePartitioningOperatorFactory.class),
@JsonSubTypes.Type(name = "naiveSort", value = NaiveSortOperatorFactory.class),
@JsonSubTypes.Type(name = "window", value = WindowOperatorFactory.class),
})
public interface OperatorFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -65,6 +66,11 @@ public WindowOperatorQuery(
super(dataSource, new LegacySegmentSpec(Intervals.ETERNITY), false, context);
this.rowSignature = rowSignature;
this.operators = operators;

// At this point, we can also reach into a QueryDataSource and validate that the ordering expected by the
// partitioning at least aligns with the ordering coming from the underlying query. We unfortunately don't
// have enough information to validate that the underlying ordering aligns with expectations for the actual
// window operator queries, but maybe we could get that and validate it here too.
if (!(dataSource instanceof QueryDataSource || dataSource instanceof InlineDataSource)) {
throw new IAE("WindowOperatorQuery must run on top of a query or inline data source, got [%s]", dataSource);
}
Expand All @@ -89,6 +95,7 @@ public boolean hasFilters()
}

@Override
@Nullable
public DimFilter getFilter()
{
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;

import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -67,9 +67,9 @@ public RowsAndColumns processInternal(
{
final AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(incomingPartition);

SortedGroupPartitioner groupPartitioner = incomingPartition.as(SortedGroupPartitioner.class);
ClusteredGroupPartitioner groupPartitioner = incomingPartition.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(incomingPartition);
groupPartitioner = new DefaultClusteredGroupPartitioner(incomingPartition);
}

retVal.addColumn(outputColumn, fn.apply(groupPartitioner.computeBoundaries(groupingCols)));
Expand Down

0 comments on commit f1821a7

Please sign in to comment.