Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with partitioning boundaries for MSQ window functions #16729

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
Expand Down Expand Up @@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final WindowOperatorQuery query;

private final List<OperatorFactory> operatorFactoryList;
private final List<String> partitionColumnNames;
private final ObjectMapper jsonMapper;
private final ArrayList<RowsAndColumns> frameRowsAndCols;
private final ArrayList<RowsAndColumns> resultRowAndCols;
Expand All @@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameReader frameReader;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
List<Integer> partitionColsIndex;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
Expand All @@ -97,7 +96,8 @@ public WindowOperatorQueryFrameProcessor(
final List<OperatorFactory> operatorFactoryList,
final RowSignature rowSignature,
final boolean isOverEmpty,
final int maxRowsMaterializedInWindow
final int maxRowsMaterializedInWindow,
final List<String> partitionColumnNames
)
{
this.inputChannel = inputChannel;
Expand All @@ -110,9 +110,9 @@ public WindowOperatorQueryFrameProcessor(
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.partitionColsIndex = new ArrayList<>();
this.isOverEmpty = isOverEmpty;
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}

@Override
Expand Down Expand Up @@ -177,12 +177,12 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
* say 20k rows and keep on adding to the same pending frame and not create a new frame
*
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
*/
Expand Down Expand Up @@ -218,7 +218,6 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
partitionColsIndex = findPartitionColumns(frameReader.signature());
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
Expand Down Expand Up @@ -259,18 +258,17 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little bit confusing with that runAllOpsOnSingleRac method; I believe the operators should only be run once...and not construct all of them for every RAC

what happens here seems to be quite similar to what the NaivePartitioningOperator does - but in a streaming fashion...
I think it would be better to implement this as an operator - that way the partitionColumnNames could also live inside the operators - and not need a different path to get passed.

but since this is a bug fix pr - this might be out of scope...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and not construct all of them for every RAC

We aren't constructing the sort, partitioning and window operator for every RAC, if that's what you meant. They are coming from operatorFactoryList declared at class level.

runAllOpsOnSingleRac does have new Operator() though, do you mean that this need not be constructed for every RAC?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meaned that in the else branch there is a call to runAllOpsOnSingleRac which launches to process an operator list - but that gets desctructed after the frame is processed and a new one is built for the next rac...

as a rac in this case could mean even a single row - that makes it a bit inefficient; as setup/shutdown cost is added to every processed rac

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at what comparePartitionKeys is doing (produces garbage) - and that it gets called for-each-row...I'm not sure if this is the right approach...

it would be probably better to:

  • push all rows until it hits the roof into the rac
  • use ArrayListRowsAndColumns's partitioning to identify the smaller sections
  • submit all partitions except the last
  • move those rows into a new starter rac; restart from the begining

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a much bigger refactoring task, hence beyond the scope of this PR? 😅
I do like the idea though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree - I've either missed the review; or more likely I haven't realized that the above is a possible alternate approach which could work better

// if they have the same partition key
// keep adding them after checking
// guardrails
objectsOfASingleRac.add(currentRow);
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
objectsOfASingleRac.add(currentRow);

} else {
// key change noted
// create rac from the rows seen before
Expand Down Expand Up @@ -484,37 +482,36 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
frameRowsAndCols.add(ldrc);
}

private List<Integer> findPartitionColumns(RowSignature rowSignature)
{
List<Integer> indexList = new ArrayList<>();
for (OperatorFactory of : operatorFactoryList) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
indexList.add(rowSignature.indexOf(s));
}
}
}
return indexList;
}

/**
*
* Compare two rows based only the columns in the partitionIndices
* In case the parition indices is empty or null compare entire row
*
* Compare two rows based on the columns in partitionColumnNames.
* If the partitionColumnNames is empty or null, compare entire row.
* <p>
* For example, say:
* <ul>
* <li>partitionColumnNames = ["d1", "d2"]</li>
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
* <li>frameReader.signature.indexOf("d1") = 0</li>
* <li>frameReader.signature.indexOf("d2") = 1</li>
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
* </ul>
* <p>
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
*/
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
{
if (partitionIndices == null || partitionIndices.isEmpty()) {
if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
return row1.equals(row2);
} else {
int match = 0;
for (int i : partitionIndices) {
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
match++;
}
}
return match == partitionIndices.size();
return match == partitionColumnNames.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,24 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final RowSignature stageRowSignature;
private final boolean isEmptyOver;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved

@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("emptyOver") boolean emptyOver,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be marked null-able to maintain backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing emptyOver in #16754, as it's redundant with partitionColumnNames. My thinking was that it's okay to not worry about backward compatibility, in favor of keeping a cleaner codebase - considering this feature isn't GA yet.
Thoughts?

)
{
this.query = Preconditions.checkNotNull(query, "query");
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.isEmptyOver = emptyOver;
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}

@JsonProperty("query")
Expand All @@ -90,6 +93,12 @@ public List<OperatorFactory> getOperators()
return operatorList;
}

@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}

@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
Expand Down Expand Up @@ -148,7 +157,6 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());

return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
Expand All @@ -159,7 +167,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
operatorList,
stageRowSignature,
isEmptyOver,
maxRowsMaterializedInWindow
maxRowsMaterializedInWindow,
partitionColumnNames
);
}
);
Expand All @@ -185,12 +194,13 @@ public boolean equals(Object o)
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}

@Override
public int hashCode()
{
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
}
}
Loading
Loading