Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with partitioning boundaries for MSQ window functions #16729

Merged

Conversation

Akshat-Jain
Copy link
Contributor

@Akshat-Jain Akshat-Jain commented Jul 12, 2024

Description

This PR fixes some issues with MSQ window functions.

Issue 1: NPE issues when multiple windows are used

Currently, queries like the following run into a NPE when using MSQ:

select countryName, cityName, channel, 
row_number() over (partition by cityName order by countryName, cityName, channel) as c1, 
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel

because the List<OperatorFactory> we get in WindowOperatorQueryKit layer is [Sort, Partition, Window1, Window2].

WindowOperatorQueryKit was trying to group the window factories into different groups, and was asserting a partition operator factory to be present in each group, which wasn't valid, and ended up giving NPE.

Issue 2: Query correctness issues because of incorrect trimming of row signature

Currently, queries like the following give incorrect results when using MSQ:

select cityName, countryName,
row_number() over (partition by countryName order by countryName, cityName, channel) as c1,
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel

because we are incorrectly trimming the row signature in WindowOperatorQueryKit in the following part of code:

      final int numberOfWindows = operatorList.size();
      final int baseSize = rowSignature.size() - numberOfWindows;
      for (int i = 0; i < baseSize; i++) {
        bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get());
      }

      for (int i = 0; i < numberOfWindows; i++) {
        bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build();
        // rest of the code
      }

Solution

Issue 1 is fixed by removing the assertion, and returning null when no partition operator factory is present for the current window stage evaluation. This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.

Issue 2 is fixed by revamping the logic of computing the row signature for every window stage. Changes done to achieve this:

  1. Added a getOutputColumnNames() method in Processor interface
  2. Calculating the list of partition column names for a given window stage, and passing that list to the window processor layer, for usage in WindowOperatorQueryFrameProcessor#comparePartitionKeys.

Test Plan

  1. Added DruidWindowQueryTest and MSQDruidWindowQueryTest, which use the same wiring done in DrillWindowQueryTest and MSQDrillWindowQueryTest. The common functionality was moved into a base class WindowQueryTestBase. We decided to create a new layer as we didn't want to add non-drill tests into the existing drill test layer.
  2. Added a few category of tests in DruidWindowQueryTest (and hence also in MSQDruidWindowQueryTest).
  3. Did a lot of manual testing as well.

Key changed/added classes in this PR
  • WindowOperatorQueryKit
  • WindowOperatorQueryFrameProcessor
  • Test files: DruidWindowQueryTest , MSQDruidWindowQueryTest, DrillWindowQueryTest, MSQDrillWindowQueryTest, WindowQueryTestBase

Release Note

This change is backwards incompatible, and can cause issues for MSQ queries with window functions during the upgrade.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jul 12, 2024
Copy link
Contributor

@sreemanamala sreemanamala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alongside, I think its better to not use the .q and .e files to run the normal test cases. It checks only the output, which is expected for drill tests. In general we would loose the capability of the asserting other aspects of planning and execution (for example - test cases under shuffle columns). For native engine window tests, we have parameterised tests which are still able to do stuff like operator validation.

@Akshat-Jain
Copy link
Contributor Author

Alongside, I think its better to not use the .q and .e files to run the normal test cases. It checks only the output, which is expected for drill tests. In general we would loose the capability of the asserting other aspects of planning and execution (for example - test cases under shuffle columns)

@sreemanamala It's intentional. I wanted to add a bunch of tests where we validate that the outputs of MSQ and sql-native engine are the same. We have a very nice wiring already for it in the Drill tests, so I re-used that.

We have a MSQWindowTest where more specific tests with more detailed validations should go - but that's MSQ specific. For now, I was hoping to validate that queries lead to same result with both engines. I expect to add a LOT of tests in the DruidWindowQueryTest file that I added, that would run for both engines. I feel we should have more tests that run for both engines, even if they verify only the outputs for now.

But to your point, I will add more specific tests to the MSQ specific test file as well for more detailed assertions. I'm kinda hoping to revamp the rest of the logic in the next 2-3 PRs, detailed tests would make more sense then. Hope that sounds good! :)

@Akshat-Jain Akshat-Jain force-pushed the msq-wf-partitioning-boundary-issues branch from ad1b6ad to fe1c300 Compare July 16, 2024 04:47
Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WindowQueryTestBase has a lot of things relating to drill - why the need to move those?

@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little bit confusing with that runAllOpsOnSingleRac method; I believe the operators should only be run once...and not construct all of them for every RAC

what happens here seems to be quite similar to what the NaivePartitioningOperator does - but in a streaming fashion...
I think it would be better to implement this as an operator - that way the partitionColumnNames could also live inside the operators - and not need a different path to get passed.

but since this is a bug fix pr - this might be out of scope...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and not construct all of them for every RAC

We aren't constructing the sort, partitioning and window operator for every RAC, if that's what you meant. They are coming from operatorFactoryList declared at class level.

runAllOpsOnSingleRac does have new Operator() though, do you mean that this need not be constructed for every RAC?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meaned that in the else branch there is a call to runAllOpsOnSingleRac which launches to process an operator list - but that gets desctructed after the frame is processed and a new one is built for the next rac...

as a rac in this case could mean even a single row - that makes it a bit inefficient; as setup/shutdown cost is added to every processed rac

@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at what comparePartitionKeys is doing (produces garbage) - and that it gets called for-each-row...I'm not sure if this is the right approach...

it would be probably better to:

  • push all rows until it hits the roof into the rac
  • use ArrayListRowsAndColumns's partitioning to identify the smaller sections
  • submit all partitions except the last
  • move those rows into a new starter rac; restart from the begining

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a much bigger refactoring task, hence beyond the scope of this PR? 😅
I do like the idea though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree - I've either missed the review; or more likely I haven't realized that the above is a possible alternate approach which could work better

Comment on lines +210 to +230
log.info("Using row signature [%s] for window stage.", stageRowSignature);

boolean partitionOperatorExists = false;
List<String> currentPartitionColumns = new ArrayList<>();
for (OperatorFactory of : operatorList.get(i)) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
currentPartitionColumns.add(s);
partitionOperatorExists = true;
}
}
}

if (partitionOperatorExists) {
partitionColumnNames = currentPartitionColumns;
}

log.info(
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
partitionColumnNames
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it make it a bit more readable to have this inside a method?
I don't agree with going thru all the operators and adding all's partition column to a list...

all the code and stuff here naturally wants to have an object like:

class WndStage {
  PartitionOperator partitionOperator;
  SortOperator sortOperator;
  List<Operator>  workOperators;
}

even the existance of such a class will ensure that there is no more than 1 partitionoperator in a stage and also gives a home for methods like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this idea!

Can I take it up in a separate future PR though?

For my next PR, I'm working on revamping the logic of getOperatorListFromQuery() method to fix scenarios with empty over() clauses. I can either make this refactoring change in that PR, or the one after that.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course!
I would recommend to separate these as much as possible - many small PRs used to get reviews faster and because of the size them the feedback is usually also much better!

@Akshat-Jain
Copy link
Contributor Author

@kgyrtkirk

the WindowQueryTestBase has a lot of things relating to drill - why the need to move those?

I created this base class since I added DruidWindowQueryTest, which had a lot of common methods with DrillWindowQueryTest. So I moved the common methods and logic to WindowQueryTestBase.

@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree - I've either missed the review; or more likely I haven't realized that the above is a possible alternate approach which could work better

@@ -259,7 +258,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meaned that in the else branch there is a call to runAllOpsOnSingleRac which launches to process an operator list - but that gets desctructed after the frame is processed and a new one is built for the next rac...

as a rac in this case could mean even a single row - that makes it a bit inefficient; as setup/shutdown cost is added to every processed rac

Comment on lines +210 to +230
log.info("Using row signature [%s] for window stage.", stageRowSignature);

boolean partitionOperatorExists = false;
List<String> currentPartitionColumns = new ArrayList<>();
for (OperatorFactory of : operatorList.get(i)) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
currentPartitionColumns.add(s);
partitionOperatorExists = true;
}
}
}

if (partitionOperatorExists) {
partitionColumnNames = currentPartitionColumns;
}

log.info(
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
partitionColumnNames
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course!
I would recommend to separate these as much as possible - many small PRs used to get reviews faster and because of the size them the feedback is usually also much better!

@kgyrtkirk kgyrtkirk self-requested a review July 17, 2024 08:22
Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the changes! :)
I've no more questions :D
+1 (pending tests)

Copy link
Member

@asdf2014 asdf2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@asdf2014 asdf2014 merged commit b53c26f into apache:master Jul 18, 2024
88 checks passed

@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("emptyOver") boolean emptyOver,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be marked null-able to maintain backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing emptyOver in #16754, as it's redundant with partitionColumnNames. My thinking was that it's okay to not worry about backward compatibility, in favor of keeping a cleaner codebase - considering this feature isn't GA yet.
Thoughts?

// Later we should also check if these can be parallelized.
// Check if there is an empty OVER() clause or not.
RowSignature rowSignature = originalQuery.getRowSignature();
log.info("Row signature received for query is [%s].", rowSignature);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log statement does not add any value for the end user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to add logs for better debuggability. We can certainly tone down the logging when this has had some soak time and we have more confidence on the stability of it. Thoughts?

edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Jul 19, 2024
…che#16729)

* Fix issues with partitioning boundaries for MSQ window functions

* Address review comments

* Address review comments

* Add test for coverage check failure

* Address review comment

* Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java

* Address review comments

* Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory

* Address review comment

* Fix checkstyle

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Jul 19, 2024
…che#16729)

* Fix issues with partitioning boundaries for MSQ window functions

* Address review comments

* Address review comments

* Add test for coverage check failure

* Address review comment

* Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java

* Address review comments

* Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory

* Address review comment

* Fix checkstyle

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
sreemanamala pushed a commit to sreemanamala/druid that referenced this pull request Aug 6, 2024
…che#16729)

* Fix issues with partitioning boundaries for MSQ window functions

* Address review comments

* Address review comments

* Add test for coverage check failure

* Address review comment

* Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java

* Address review comments

* Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory

* Address review comment

* Fix checkstyle

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants