Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group By of windowed sources results in UnsupportedOperationException #4396

Open
big-andy-coates opened this issue Jan 28, 2020 · 0 comments
Labels
bug P0 Denotes must-have for a given milestone

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Jan 28, 2020

QTT test case:

{
      "name": "windowed stream source using window bounds columns",
      "statements": [
        "CREATE STREAM TEST (ID bigint KEY, ignored bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', WINDOW_TYPE='Tumbling', WINDOW_SIZE='30 seconds');",
        "CREATE TABLE S2 as SELECT count(1) as count, min(windowstart) as wstart, min(windowend) as wend FROM test GROUP BY ID;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}},
        {"topic": "test_topic", "key": 0, "value": "100", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}},
        {"topic": "test_topic", "key": 0, "value": "10", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}},
        {"topic": "test_topic", "key": 1, "value": "50", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}}
      ],
      "outputs": [
        {"topic": "S2", "key": 0, "value": "1,0,30000", "timestamp": 0},
        {"topic": "S2", "key": 0, "value": "2,0,30000", "timestamp": 0},
        {"topic": "S2", "key": 0, "value": "3,0,30000", "timestamp": 0},
        {"topic": "S2", "key": 1, "value": "1,30000,60000", "timestamp": 30000}
      ],
      "post": {
        "sources": [
          {
            "name": "S2",
            "type": "table",
            "keyFormat": {"format": "KAFKA", "windowType": null},
            "schema": "ROWKEY INT KEY, COUNT BIGINT, WSTART BIGINT, WEND BIGINT"
          }
        ]
      }
    }

Results in error:


io.confluent.ksql.util.KsqlStatementException: Group by on windowed should always require rekey
Statement: CREATE TABLE S2 AS SELECT
  COUNT(1) COUNT,
  MIN(TEST.WINDOWSTART) WSTART,
  MIN(TEST.WINDOWEND) WEND
FROM TEST TEST
GROUP BY TEST.ROWKEY
EMIT CHANGES
Statement: CREATE TABLE S2 as SELECT count(1) as count, min(windowstart) as wstart, min(windowend) as wend FROM test GROUP BY ROWKEY;

	at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.planStatement(TestExecutorUtil.java:407)
	at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.hasNext(TestExecutorUtil.java:352)
	at io.confluent.ksql.test.tools.TestExecutorUtil.execute(TestExecutorUtil.java:259)
	at io.confluent.ksql.test.tools.TestExecutorUtil.doBuildQueries(TestExecutorUtil.java:208)
	at io.confluent.ksql.test.tools.TestExecutorUtil.buildStreamsTopologyTestDrivers(TestExecutorUtil.java:89)
	at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:134)
	at io.confluent.ksql.test.EndToEndEngineTestUtil.shouldBuildAndExecuteQuery(EndToEndEngineTestUtil.java:46)
	at io.confluent.ksql.test.QueryTranslationTest.shouldBuildAndExecuteQueries(QueryTranslationTest.java:83)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.lang.UnsupportedOperationException: Group by on windowed should always require rekey
	at io.confluent.ksql.structured.SchemaKStream.groupByKey(SchemaKStream.java:464)
	at io.confluent.ksql.structured.SchemaKStream.groupBy(SchemaKStream.java:432)
	at io.confluent.ksql.planner.plan.AggregateNode.buildStream(AggregateNode.java:211)
	at io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode.buildStream(KsqlStructuredDataOutputNode.java:114)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:73)
	at io.confluent.ksql.engine.QueryEngine.buildPhysicalPlan(QueryEngine.java:105)
	at io.confluent.ksql.engine.EngineExecutor.planQuery(EngineExecutor.java:207)
	at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:157)
	at io.confluent.ksql.engine.KsqlEngine.plan(KsqlEngine.java:176)
	at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.planStatement(TestExecutorUtil.java:397)
	... 37 more

This error message is really unhelpful.

The query is taking a windowed stream with a schema, (including the window bounds columns of):

ID BIGINT KEY, WINDOWSTART BIGINT, WINDOWEND BIGINT, IGNORED BIGINT

And then is performing a GROUP BY on the ID column, counting the number of rows, and capturing the minimum window start and end times. As it stands, it's a pretty nonsensical query. Though there may be useful variants that utilise different udafs... just can't think of any!

Really, such a query should probably just work. So solution 1 is 'make it work'. Alternatively, we can punt fixing this to some later time (maybe after structured keys work). In which case we should split this into two tasks. Task 1: improve the error message, i.e. replace the UnsupportedOperationException with a single KsqlException stating "GROUP BY on a windowed source is not supported", and then a second task to actually make it work.

@vpapavas vpapavas added this to the 0.8.0 milestone Feb 12, 2020
@vcrfxia vcrfxia modified the milestones: 0.8.0, 0.9.0 Mar 18, 2020
@derekjn derekjn added the bug label Mar 26, 2020
@spena spena modified the milestones: 0.9.0, 0.10.0 Apr 27, 2020
@big-andy-coates big-andy-coates modified the milestones: 0.10.0, 0.11.0 Jun 17, 2020
@agavra agavra added the P0 Denotes must-have for a given milestone label Jun 24, 2020
@agavra agavra changed the title Join of windowed sources results in UnsupportedOperationException Group By of windowed sources results in UnsupportedOperationException Jun 24, 2020
@agavra agavra removed this from the 0.11.0 milestone Jun 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug P0 Denotes must-have for a given milestone
Projects
None yet
Development

No branches or pull requests

6 participants