diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index fd8ccae90f6a..3597d04ce806 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -382,8 +382,11 @@ private boolean needsRepartition(final Expression expression) { return !namesMatch && !isRowKey(columnRef); } - private static boolean isRowKey(final ColumnRef fieldName) { - return fieldName.name().equals(SchemaUtil.ROWKEY_NAME); + private boolean isRowKey(final ColumnRef fieldName) { + // until we support structured keys, there will never be any key column other + // than "ROWKEY" - furthermore, that key column is always prefixed at this point + // unless it is a join, in which case every other source field is prefixed + return fieldName.equals(schema.key().get(0).ref()); } private static ColumnName fieldNameFromExpression(final Expression expression) { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json index 7ff4e9699672..8f9fbfd3c2f9 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -132,28 +132,109 @@ "inputs": [{"topic": "input", "value": {"ID": 22}, "timestamp": 10}], "outputs": [{"topic": "OUTPUT", "key": "10", "value": {"ID": 22}, "timestamp": 10}] }, + { + "name": "partition by ROWKEY in join on ROWKEY", + "statements": [ + "CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');", + "CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');", + "CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.ROWKEY;" + ], + "inputs": [ + {"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}}, + {"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "keyField": "L_ROWKEY"} + ], + "topics": { + "blacklist": ".*-repartition" + } + } + }, { "name": "partition by ROWKEY in join on non-ROWKEY", "statements": [ "CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');", "CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');", - "CREATE STREAM OUTPUT AS SELECT L.A, L.B, R.C, R.D, L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.ROWKEY;" + "CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.ROWKEY;" ], - "comments": [ - "This test demonstrates a problem when we JOIN on a non-ROWKEY field and then PARTITION BY ", - "a ROWKEY field. Note that the key is 'join' when it should be 'a' and the key-field is 'B' ", - "when it should be 'L_ROWKEY'" + "inputs": [ + {"topic": "LEFT", "key": "a", "value": {"A": "a", "B": "join"}}, + {"topic": "RIGHT", "key": "c", "value": {"C": "c", "D": "join"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "a", "value": {"L_ROWKEY": "a", "R_ROWKEY": "c"}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "keyField": "L_ROWKEY"} + ] + } + }, + { + "name": "partition by ROWKEY in join on ROWKEY ALIASED", + "statements": [ + "CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');", + "CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');", + "CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.A;" + ], + "inputs": [ + {"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}}, + {"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "keyField": null} + ], + "topics": { + "blacklist": ".*-repartition" + } + } + }, + { + "name": "partition by non-ROWKEY in join on ROWKEY", + "statements": [ + "CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');", + "CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');", + "CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.B;" + ], + "inputs": [ + {"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}}, + {"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "b", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "keyField": null} + ] + } + }, + { + "name": "partition by non-ROWKEY in join on non-ROWKEY", + "statements": [ + "CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');", + "CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');", + "CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.B;" ], "inputs": [ {"topic": "LEFT", "key": "a", "value": {"A": "a", "B": "join"}}, {"topic": "RIGHT", "key": "c", "value": {"C": "c", "D": "join"}} ], "outputs": [ - {"topic": "OUTPUT", "key": "join", "value": {"A": "a", "B": "join", "C": "c", "D": "join", "L_ROWKEY": "a", "R_ROWKEY": "c"}} + {"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "a", "R_ROWKEY": "c"}} ], "post": { "sources": [ - {"name": "OUTPUT", "type": "stream", "keyField": "B"} + {"name": "OUTPUT", "type": "stream", "keyField": null} ] } }