Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommandRunner fails to serialize joins with select * and WHERE clause #5503

Closed
vcrfxia opened this issue May 28, 2020 · 4 comments
Closed

CommandRunner fails to serialize joins with select * and WHERE clause #5503

vcrfxia opened this issue May 28, 2020 · 4 comments
Assignees
Labels
blocker bug P0 Denotes must-have for a given milestone
Milestone

Comments

@vcrfxia
Copy link
Contributor

vcrfxia commented May 28, 2020

Describe the bug

Certain types of joins result in a command runner serialization exception, which causes #5502 to be hit, thus making the ksqlDB cluster unable to accept further requests.

To Reproduce

  1. Create a couple sources:
create table users (rowkey varchar primary key, namesp varchar, email varchar) with (kafka_topic='users',value_format='json');
create stream pageviews (userid varchar, namesp varchar, pageid varchar) with (kafka_topic='pageviews',value_format='json',partitions=1);
  1. Issue a join with a select * and WHERE clause: create stream joined as select p.* from pageviews p join users u on p.userid=u.rowkey where p.namesp='n1' emit changes;

Expected behavior

The query succeeds.

Actual behaviour

The CLI prints

 Message
------------------------------------
 Statement written to command topic
------------------------------------

but the server logs clearly show the request failed:

ksqldb-server    | Exception in thread "CommandRunner" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-foo2_command_topic-0 at offset 6. If needed, please seek past the record to continue consumption.
ksqldb-server    | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'selectExpressions' (index 2)
ksqldb-server    |  at [Source: (byte[])"{"statement":"CREATE STREAM JOINED WITH (KAFKA_TOPIC='JOINED', PARTITIONS=1, REPLICAS=1) AS SELECT P.*\nFROM PAGEVIEWS P\nINNER JOIN USERS U ON ((P.USERID = U.ROWKEY))\nWHERE (P.NAMESP = 'n1')\nEMIT CHANGES;","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.persistence.wrap.single.values":"true","ksql.authorization.cache.expiry.time.secs":"30","ksql.streams.default.deserialization.exception.handler":"i"[truncated 3798 bytes]; line: 1, column: 4160] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["queryPlan"]->io.confluent.ksql.engine.QueryPlan["physicalPlan"]->io.confluent.ksql.execution.plan.StreamSink["source"]->io.confluent.ksql.execution.plan.StreamSelect["selectExpressions"])
ksqldb-server    | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'selectExpressions' (index 2)
ksqldb-server    |  at [Source: (byte[])"{"statement":"CREATE STREAM JOINED WITH (KAFKA_TOPIC='JOINED', PARTITIONS=1, REPLICAS=1) AS SELECT P.*\nFROM PAGEVIEWS P\nINNER JOIN USERS U ON ((P.USERID = U.ROWKEY))\nWHERE (P.NAMESP = 'n1')\nEMIT CHANGES;","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.persistence.wrap.single.values":"true","ksql.authorization.cache.expiry.time.secs":"30","ksql.streams.default.deserialization.exception.handler":"i"[truncated 3798 bytes]; line: 1, column: 4160] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["queryPlan"]->io.confluent.ksql.engine.QueryPlan["physicalPlan"]->io.confluent.ksql.execution.plan.StreamSink["source"]->io.confluent.ksql.execution.plan.StreamSelect["selectExpressions"])
ksqldb-server    | 	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
ksqldb-server    | 	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1456)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:194)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:160)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:229)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:198)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:488)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:194)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:130)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:527)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:194)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:130)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:527)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:185)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:194)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:130)
ksqldb-server    | 	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:186)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server    | 	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
ksqldb-server    | 	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
ksqldb-server    | 	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3266)
ksqldb-server    | 	at io.confluent.ksql.rest.server.computation.InternalTopicSerdes$InternalTopicDeserializer.deserialize(InternalTopicSerdes.java:60)
ksqldb-server    | 	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1311)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1542)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1378)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
ksqldb-server    | 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
ksqldb-server    | 	at io.confluent.ksql.rest.server.CommandTopic.getNewCommands(CommandTopic.java:77)
ksqldb-server    | 	at io.confluent.ksql.rest.server.computation.CommandStore.getNewCommands(CommandStore.java:192)
ksqldb-server    | 	at io.confluent.ksql.rest.server.computation.CommandRunner.fetchAndRunCommands(CommandRunner.java:209)
ksqldb-server    | 	at io.confluent.ksql.rest.server.computation.CommandRunner$Runner.run(CommandRunner.java:295)
ksqldb-server    | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
ksqldb-server    | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
ksqldb-server    | 	at java.base/java.lang.Thread.run(Thread.java:834)

Additional context

This definitely happens on 0.9.0, probably also on master and earlier versions too but I'd need to confirm.

@big-andy-coates
Copy link
Contributor

Already fixed. The Join now returns The projection contains no value columns..

@vcrfxia
Copy link
Contributor Author

vcrfxia commented Jun 24, 2020

@big-andy-coates are you saying the query syntax is invalid? Why is the behavior not to select all value columns from p?

@big-andy-coates
Copy link
Contributor

Thanks for catching this Victoria. I have to admit I just pasted the SQL into the latest code and made sure the serialization error didn't occur.

Given:

create table users (
    rowkey varchar primary key, 
    namesp varchar, 
    email varchar
  ) with (
    kafka_topic='users',
    value_format='json'
  );

  create stream pageviews (
    userid varchar, 
    namesp varchar,
    pageid varchar
  ) with (
    kafka_topic='pageviews',
    value_format='json',partitions=1
  );

  create stream joined as 
    select 
      p.* 
    from pageviews p 
      join users u on p.userid=u.rowkey 
    where p.namesp='n1'
    emit changes;

p.* should expand to p.userId, p.namesp and p.pageid, with p.userId being the key column.

I'll look into this more.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Jun 24, 2020

This is a regression! Needs fixing in 6.0.0 and 0.10.

The implications of this are not that it crashes the command runner thread as would have previously been the case. Instead, the implications are that queries that users could previously run will no fail. Existing running queries will not be affected.

Affected queries will be any using combining:

  • A join
  • with a projection using a scoped 'all columns', e.g. a A.*
  • with a where clause

e.g.

SELECT A.*, B.Id 
   FROM A
     JOIN B ON A.Id = B.userId
   WHERE A.x < 10;

Marking as blocker as this needs to be including in the next patch releases.

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Jun 25, 2020
confluentinc#5503

This fixes a regression introduced in 0.10.

The implications of this are not that it crashes the command runner thread, as the original ticket states. Instead, the implications are that queries a users could previously run will now fail. Existing running queries will not be affected.

Affected queries will be any using combining:

* A join
* with a projection using a scoped 'all columns', e.g. a A.*
* with a where clause

e.g.

```
SELECT A.*, B.Id
   FROM A
     JOIN B ON A.Id = B.userId
   WHERE A.x < 10;
```
@big-andy-coates big-andy-coates modified the milestones: 0.11.0, 0.10.1 Jun 29, 2020
big-andy-coates added a commit that referenced this issue Jun 30, 2020
#5503

This fixes a regression introduced in 0.10.

The implications of this are not that it crashes the command runner thread, as the original ticket states. Instead, the implications are that queries a users could previously run will now fail. Existing running queries will not be affected.

Affected queries will be any using combining:

* A join
* with a projection using a scoped 'all columns', e.g. a A.*
* with a where clause

e.g.

```
SELECT A.*, B.Id
   FROM A
     JOIN B ON A.Id = B.userId
   WHERE A.x < 10;
```

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocker bug P0 Denotes must-have for a given milestone
Projects
None yet
Development

No branches or pull requests

3 participants