Skip to content

Commit

Permalink
fix: properly set key when partition by ROWKEY and join on non-ROWKEY (
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Dec 10, 2019
1 parent 4dd76ac commit 6c80941
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,11 @@ private boolean needsRepartition(final Expression expression) {
return !namesMatch && !isRowKey(columnRef);
}

private static boolean isRowKey(final ColumnRef fieldName) {
return fieldName.name().equals(SchemaUtil.ROWKEY_NAME);
private boolean isRowKey(final ColumnRef fieldName) {
// until we support structured keys, there will never be any key column other
// than "ROWKEY" - furthermore, that key column is always prefixed at this point
// unless it is a join, in which case every other source field is prefixed
return fieldName.equals(schema.key().get(0).ref());
}

private static ColumnName fieldNameFromExpression(final Expression expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,109 @@
"inputs": [{"topic": "input", "value": {"ID": 22}, "timestamp": 10}],
"outputs": [{"topic": "OUTPUT", "key": "10", "value": {"ID": 22}, "timestamp": 10}]
},
{
"name": "partition by ROWKEY in join on ROWKEY",
"statements": [
"CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');",
"CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');",
"CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.ROWKEY;"
],
"inputs": [
{"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}},
{"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "keyField": "L_ROWKEY"}
],
"topics": {
"blacklist": ".*-repartition"
}
}
},
{
"name": "partition by ROWKEY in join on non-ROWKEY",
"statements": [
"CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');",
"CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');",
"CREATE STREAM OUTPUT AS SELECT L.A, L.B, R.C, R.D, L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.ROWKEY;"
"CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.ROWKEY;"
],
"comments": [
"This test demonstrates a problem when we JOIN on a non-ROWKEY field and then PARTITION BY ",
"a ROWKEY field. Note that the key is 'join' when it should be 'a' and the key-field is 'B' ",
"when it should be 'L_ROWKEY'"
"inputs": [
{"topic": "LEFT", "key": "a", "value": {"A": "a", "B": "join"}},
{"topic": "RIGHT", "key": "c", "value": {"C": "c", "D": "join"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "a", "value": {"L_ROWKEY": "a", "R_ROWKEY": "c"}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "keyField": "L_ROWKEY"}
]
}
},
{
"name": "partition by ROWKEY in join on ROWKEY ALIASED",
"statements": [
"CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');",
"CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');",
"CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.A;"
],
"inputs": [
{"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}},
{"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "keyField": null}
],
"topics": {
"blacklist": ".*-repartition"
}
}
},
{
"name": "partition by non-ROWKEY in join on ROWKEY",
"statements": [
"CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');",
"CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');",
"CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.A = R.C PARTITION BY L.B;"
],
"inputs": [
{"topic": "LEFT", "key": "join", "value": {"A": "join", "B": "b"}},
{"topic": "RIGHT", "key": "join", "value": {"C": "join", "D": "d"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "b", "value": {"L_ROWKEY": "join", "R_ROWKEY": "join"}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "keyField": null}
]
}
},
{
"name": "partition by non-ROWKEY in join on non-ROWKEY",
"statements": [
"CREATE STREAM L (A STRING, B STRING) WITH (kafka_topic='LEFT', value_format='JSON', KEY='A');",
"CREATE STREAM R (C STRING, D STRING) WITH (kafka_topic='RIGHT', value_format='JSON', KEY='C');",
"CREATE STREAM OUTPUT AS SELECT L.ROWKEY, R.ROWKEY FROM L JOIN R WITHIN 10 SECONDS ON L.B = R.D PARTITION BY L.B;"
],
"inputs": [
{"topic": "LEFT", "key": "a", "value": {"A": "a", "B": "join"}},
{"topic": "RIGHT", "key": "c", "value": {"C": "c", "D": "join"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "join", "value": {"A": "a", "B": "join", "C": "c", "D": "join", "L_ROWKEY": "a", "R_ROWKEY": "c"}}
{"topic": "OUTPUT", "key": "join", "value": {"L_ROWKEY": "a", "R_ROWKEY": "c"}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "keyField": "B"}
{"name": "OUTPUT", "type": "stream", "keyField": null}
]
}
}
Expand Down

0 comments on commit 6c80941

Please sign in to comment.