Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disabling rollup on post agg operators for MSQ sql based ingestion. #13179

Closed
wants to merge 3 commits into from

Conversation

cryptoe
Copy link
Contributor

@cryptoe cryptoe commented Oct 4, 2022

While running query :

REPLACE INTO country_shard_test_1 OVERWRITE ALL
SELECT Country,Capital,    THETA_SKETCH_INTERSECT(
      DS_THETA(Country),
      DS_THETA(Capital)
    ) as sketch FROM TABLE(
  EXTERN(
    '{"type":"http","uris":["https://static.imply.io/lookup/country.tsv"]}',
    '{"type":"tsv","findColumnsFromHeader":true}',
    '[{"name":"Country","type":"string"},{"name":"Capital","type":"string"},{"name":"ISO3","type":"string"},{"name":"ISO2","type":"string"}]'))
  group by 1,2
  PARTITIONED BY ALL

We get an unknown error :

UnknownError: java.util.NoSuchElementException (Stack trace)
Failed task ID: query-0bc9b9c3-de75-416c-bfdb-6ab802871826 (on host: localhost:8100)
Debug: get query detail archive
Full stack trace
java.util.NoSuchElementException
    at java.base/java.util.Collections$EmptyIterator.next(Collections.java:4210)
    at com.google.common.collect.Iterators.getOnlyElement(Iterators.java:297)
    at com.google.common.collect.Iterables.getOnlyElement(Iterables.java:285)
    at org.apache.druid.msq.exec.ControllerImpl.makeDimensionsAndAggregatorsForIngestion(ControllerImpl.java:1646)
    at org.apache.druid.msq.exec.ControllerImpl.generateDataSchema(ControllerImpl.java:1431)
    at org.apache.druid.msq.exec.ControllerImpl.makeQueryDefinition(ControllerImpl.java:1398)
    at org.apache.druid.msq.exec.ControllerImpl.initializeQueryDefAndState(ControllerImpl.java:523)
    at org.apache.druid.msq.exec.ControllerImpl.runTask(ControllerImpl.java:346)
    at org.apache.druid.msq.exec.ControllerImpl.run(ControllerImpl.java:296)
    at org.apache.druid.msq.indexing.MSQControllerTask.run(MSQControllerTask.java:192)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

As in SQL based ingestion with roll up mode enabled, we do not know how to do post aggregator based ingestion, I have disabled the code path which does that now.
Instead is nudges user to to disable the rollup mode by following instructions here : https://druid.apache.org/docs/24.0.0/multi-stage-query/concepts.html#rollup

Fixed the bug ...


Key changed/added classes in this PR
  • ControllerImpl

This PR has:

  • been self-reviewed.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 4, 2022

This PR is related to : #13180

@abhishekagarwal87 abhishekagarwal87 added the Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 label Oct 4, 2022
@abhishekagarwal87
Copy link
Contributor

Iterables.getOnlyElement should be banned and replaced with something that forces developers to put a nice error message 😄 . Both for the case, when there is no element and when there is more than one element in the collection.

List<String> outputColumns = columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName());
if (outputColumns == null || outputColumns.size() != 1) {
throw new ISE(
"Unable to run the statement in roll up mode. Please try disabling the rollup mode. Check SQL-based ingestion docs for instructions.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Unable to run the statement in roll up mode. Please try disabling the rollup mode. Check SQL-based ingestion docs for instructions.");
"Cannot handle <aggregator-name> in the rollup mode. You can disable the rollup mode or use different aggregators. Please refer to SQL-based ingestion docs for more details.");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also log the outputColumns and any other info for troubleshooting.

@davecromberge
Copy link
Member

@cryptoe thank you for looking into this issue. As a user, I'm somewhat confused - it appears as if ingesting records using multi-stage SQL does not support rollups on post-aggregators such as a sketch intersection. What are the consequences of this? For example, if my SQL statement is as follows:

INSERT INTO user_activity
SELECT user_cohort_country.__time, user_cohort_country.cohort, user_cohort_country.country, 
    THETA_SKETCH_INTERSECT(
      DS_THETA(user_cohort_country.segments), 
      DS_THETA(user_cohort_country.countries)
    ) as sketch
FROM user_cohort_country
GROUP BY 1, 2, 3
PARTITIONED BY DAY

Is the rollup automatic, or does it replicate the rollup configuration of the source table? I'm trying to understand where the rollup is implied in the statement above.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 4, 2022

@cryptoe thank you for looking into this issue. As a user, I'm somewhat confused - it appears as if ingesting records using multi-stage SQL does not support rollups on post-aggregators such as a sketch intersection. What are the consequences of this? For example, if my SQL statement is as follows:

INSERT INTO user_activity
SELECT user_cohort_country.__time, user_cohort_country.cohort, user_cohort_country.country, 
    THETA_SKETCH_INTERSECT(
      DS_THETA(user_cohort_country.segments), 
      DS_THETA(user_cohort_country.countries)
    ) as sketch
FROM user_cohort_country
GROUP BY 1, 2, 3
PARTITIONED BY DAY

If tomorrow you decide to partition by month, using reindex, then druid needs to understand how to merge the THETA_SKETCH_INTERSECT across various days.
In rollup mode, we automatically figure it out, meaning that you can submit an autocompaction task and druid engine will be smart enough to figure out how to merge across various days.
Having said that, you can also submit a manual compaction task and basically tell the druid engine how to merge the column sketch.

There are always limits to this.
for a query like
insert into abc select a,b, count(a)/count(b) from cde group by 1,2 partitioned by day
We cant really merge the ratio across days. Instead what's recommended is
insert into abc select a,b, count(a),count(b) from cde group by 1,2 partitioned by day
and at the query time just do a divide.

Is the rollup automatic, or does it replicate the rollup configuration of the source table? I'm trying to understand where the rollup is implied in the statement above.

https://druid.apache.org/docs/latest/multi-stage-query/concepts.html#rollup the webconsole automatically figures out if the statement is a rollup statement or not.

@abhishekagarwal87 abhishekagarwal87 added this to the 24.0.1 milestone Oct 5, 2022
@abhishekagarwal87 abhishekagarwal87 removed this from the 24.0.1 milestone Oct 20, 2022
@cryptoe cryptoe added the WIP label Nov 29, 2022
# Conflicts:
#	extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@cryptoe cryptoe removed the WIP label Mar 10, 2023
List<String> outputColumns = columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName());
if (outputColumns == null || outputColumns.size() != 1) {
throw new ISE(
"Cannot use aggregator [%s] with input fields [%s] in the rollup mode. It might be using a post aggregator. Please check the native plan to figure out more information about the aggregator. You can disable the rollup mode or use a different aggregator. Please refer to SQL-based ingestion docs for more details.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error should really be in the SQL layer, for a couple reasons:

  1. It's a validation-style error that doesn't require actually executing the query. So if we can detect this problem in the SQL layer, users will get their error faster, and won't have tasks launched needlessly.
  2. We can refer to SQL concepts and SQL names rather than native concepts like "aggregator" and native names like a0.

There's also a couple of issues with actionability that should be fixed when moving this error to the SQL layer:

  • It suggests "disable the rollup mode" as a way to fix the problem, but "rollup mode" is not a thing in MSQ. As we mention in multi-stage-query/concepts.md, rollup is something that MSQ does automatically when certain conditions are met. The user would need to switch on aggregation finalization, which may not be what they want.
  • It suggests "refer to SQL-based ingestion docs", but doesn't provide a link or a hint on what to look for. Doc references should be URLs.
  • It suggests "check the native plan", but doesn't say how to do that. (Although I think when we move this to SQL layer, we won't need this part.)

There's three places we can check things in the SQL layer:

  1. In the SQL validation phase (after parsing, prior to optimizing), i.e. in validate in DruidPlanner or IngestHandler.
  2. Immediately prior to translation from logical plan to Druid plan, i.e. in MSQTaskSqlEngine#buildQueryMakerForInsert
  3. Immediately prior to execution, i.e. in MSQTaskQueryMaker (the latest possible time to validate something before a controller task is launched)

Ideally we validate as much as possible as early as possible. Also, ideally, we validate it in a place where we can know the SQL name of the agg function (e.g. from an instance of SqlAggFunction). That'll let us include the SQL name in the error message.

I'm not totally sure which place is best for this. My guess is (1) or (2), since by (3) we've gone pretty much fully native. Perhaps @paul-rogers would have some advice as he has some experience with the validation stack.

Copy link

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 10, 2024
Copy link

github-actions bot commented Feb 8, 2024

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Feb 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants