Skip to content

Commit

Permalink
chore: partition-by primitive key support (#4098)
Browse files Browse the repository at this point in the history
* chore: partition-by primitive key support

Fixes: #4092

WIP: This commit gets `PARTITION BY` clauses working with primitive key types. However, it does disable a couple of join until #4094 has been completed.

BREAKING CHANGE: A `PARTITION BY` now changes the SQL type of `ROWKEY` in the output schema of a query.

For example, consider:

```sql
CREATE STREAM INPUT (ROWKEY STRING KEY, ID INT) WITH (...);
CREATE STREAM OUTPUT AS SELECT ROWKEY AS NAME FROM INPUT PARTITION BY ID;
```

Previously, the above would have resulted in an output schema of `ROWKEY STRING KEY, NAME STRING`, where `ROWKEY` would have stored the string representation of the integer from the `ID` column.  With this commit the output schema will be `ROWKEY INT KEY, NAME STRING`.
  • Loading branch information
big-andy-coates committed Dec 10, 2019
1 parent 6c80941 commit 7addf88
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
import io.confluent.ksql.planner.plan.RepartitionNode;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -201,36 +203,46 @@ private static FilterNode buildFilterNode(
return new FilterNode(new PlanNodeId("WhereFilter"), sourcePlanNode, filterExpression);
}

private static RepartitionNode buildRepartitionNode(
private RepartitionNode buildRepartitionNode(
final PlanNode sourceNode,
final Expression partitionBy
) {
if (!(partitionBy instanceof ColumnReferenceExp)) {
return new RepartitionNode(
new PlanNodeId("PartitionBy"),
sourceNode,
partitionBy,
KeyField.none());
}

final ColumnRef partitionColumn = ((ColumnReferenceExp) partitionBy).getReference();
final LogicalSchema schema = sourceNode.getSchema();

final KeyField keyField;
if (schema.isMetaColumn(partitionColumn.name())) {

if (!(partitionBy instanceof ColumnReferenceExp)) {
keyField = KeyField.none();
} else if (schema.isKeyColumn(partitionColumn.name())) {
keyField = sourceNode.getKeyField();
} else {
keyField = KeyField.of(partitionColumn);
final ColumnRef columnRef = ((ColumnReferenceExp) partitionBy).getReference();
final LogicalSchema sourceSchema = sourceNode.getSchema();

final Column proposedKey = sourceSchema
.findValueColumn(columnRef)
.orElseThrow(() -> new KsqlException("Invalid identifier for PARTITION BY clause: '"
+ columnRef.name().toString(FormatOptions.noEscape()) + "' Only columns from the "
+ "source schema can be referenced in the PARTITION BY clause."));

switch (proposedKey.namespace()) {
case KEY:
keyField = sourceNode.getKeyField();
break;
case VALUE:
keyField = KeyField.of(columnRef);
break;
default:
keyField = KeyField.none();
break;
}
}

final LogicalSchema schema = buildRepartitionedSchema(sourceNode, partitionBy);

return new RepartitionNode(
new PlanNodeId("PartitionBy"),
sourceNode,
schema,
partitionBy,
keyField);

keyField
);
}

private FlatMapNode buildFlatMapNode(final PlanNode sourcePlanNode) {
Expand Down Expand Up @@ -331,4 +343,21 @@ private LogicalSchema buildProjectionSchema(final PlanNode sourcePlanNode) {

return builder.build();
}

private LogicalSchema buildRepartitionedSchema(
final PlanNode sourceNode,
final Expression partitionBy
) {
final LogicalSchema sourceSchema = sourceNode.getSchema();

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry);

final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);

return LogicalSchema.builder()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public JoinNode(
this.schema = JoinParamsFactory.createSchema(left.getSchema(), right.getSchema());

if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("GROUP BY is not supported with non-STRING keys");
throw new KsqlException("JOIN is not supported with non-STRING keys");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.planner.plan;

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand All @@ -23,39 +25,35 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Objects;

@Immutable
public class RepartitionNode extends PlanNode {

private final PlanNode source;
private final Expression partitionBy;
private final KeyField keyField;
private final LogicalSchema schema;

public RepartitionNode(
PlanNodeId id,
PlanNode source,
Expression partitionBy,
KeyField keyField
final PlanNodeId id,
final PlanNode source,
final LogicalSchema schema,
final Expression partitionBy,
final KeyField keyField
) {
super(id, source.getNodeOutputType());
this.source = Objects.requireNonNull(source, "source");
this.partitionBy = Objects.requireNonNull(partitionBy, "partitionBy");
this.keyField = Objects.requireNonNull(keyField, "keyField");

if (source.getSchema().key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("GROUP BY is not supported with non-STRING keys");
}
this.source = requireNonNull(source, "source");
this.partitionBy = requireNonNull(partitionBy, "partitionBy");
this.keyField = requireNonNull(keyField, "keyField");
this.schema = requireNonNull(schema, "schema");
}

@Override
public LogicalSchema getSchema() {
return source.getSchema();
return schema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ public void shouldRekeyIfPartitionByDoesNotMatchResultKey() {
final String planText = queryMetadataList.get(1).getExecutionPlan();
final String[] lines = planText.split("\n");
assertThat(lines.length, equalTo(4));
assertThat(lines[0], equalTo(" > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 "
assertThat(lines[0], equalTo(" > [ SINK ] | Schema: [ROWKEY BIGINT KEY, COL0 BIGINT, COL1 STRING, COL2 "
+ "DOUBLE] | Logger: InsertQuery_1.S1"));
assertThat(lines[2],
containsString("[ REKEY ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] "
containsString("[ REKEY ] | Schema: [ROWKEY BIGINT KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] "
+ "| Logger: InsertQuery_1.PartitionBy"));
assertThat(lines[1], containsString("[ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING"
assertThat(lines[1], containsString("[ PROJECT ] | Schema: [ROWKEY BIGINT KEY, COL0 BIGINT, COL1 STRING"
+ ", COL2 DOUBLE] | Logger: InsertQuery_1.Project"));
}

Expand All @@ -316,7 +316,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST2."));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY DOUBLE KEY, TEST2."));
}

@Test
Expand All @@ -332,7 +332,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST3."));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY STRING KEY, TEST3."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@

package io.confluent.ksql.execution.util;

import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.SchemaUtil;
import org.apache.kafka.connect.data.ConnectSchema;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -35,11 +39,6 @@ public final class StructKeyUtil {
private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD =
ROWKEY_STRUCT_SCHEMA.fields().get(0);

public static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from(
(ConnectSchema) ROWKEY_STRUCT_SCHEMA,
false
);

private StructKeyUtil() {
}

Expand All @@ -48,4 +47,37 @@ public static Struct asStructKey(String rowKey) {
keyStruct.put(ROWKEY_FIELD, rowKey);
return keyStruct;
}

public static KeyBuilder keySchema(final LogicalSchema schema) {
final List<Column> keyCols = schema.key();
if (keyCols.size() != 1) {
throw new UnsupportedOperationException("Only single keys supported");
}

final SqlType sqlType = keyCols.get(0).type();
final Schema connectSchema = SchemaConverters.sqlToConnectConverter().toConnectSchema(sqlType);

return new KeyBuilder(SchemaBuilder
.struct()
.field(SchemaUtil.ROWKEY_NAME.name(), connectSchema)
.build()
);
}

public static final class KeyBuilder {

private final Schema keySchema;
private final org.apache.kafka.connect.data.Field keyField;

private KeyBuilder(final Schema keySchema) {
this.keySchema = Objects.requireNonNull(keySchema, "keySchema");
this.keyField = keySchema.field(SchemaUtil.ROWKEY_NAME.name());
}

public Struct build(final Object rowKey) {
final Struct keyStruct = new Struct(keySchema);
keyStruct.put(keyField, rowKey);
return keyStruct;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;

public class StructKeyUtilTest {

private static final LogicalSchema LOGICAL_SCHEMA = LogicalSchema.builder()
.keyColumn(ColumnName.of("BOB"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("DOES_NOT_MATTER"), SqlTypes.STRING)
.build();
private KeyBuilder builder;

@Before
public void setUp() {
builder = StructKeyUtil.keySchema(LOGICAL_SCHEMA);
}

@Test(expected = UnsupportedOperationException.class)
public void shouldThrowOnMultipleKeyColumns() {
// Only single key columns initially supported
StructKeyUtil.keySchema(LogicalSchema.builder()
.keyColumn(ColumnName.of("BOB"), SqlTypes.STRING)
.keyColumn(ColumnName.of("JOHN"), SqlTypes.STRING)
.build());
}

@Test
public void shouldBuildCorrectSchema() {
// When:
final Struct result = builder.build(1);

// Then:
assertThat(result.schema(), is(SchemaBuilder.struct()
.field("ROWKEY", Schema.OPTIONAL_INT32_SCHEMA)
.build()));
}

@Test
public void shouldHandleValue() {
// When:
final Struct result = builder.build(1);

// Then:
assertThat(result.getInt32("ROWKEY"), is(1));
}

@Test
public void shouldHandleNulls() {
// When:
final Struct result = builder.build(null);

// Then:
assertThat(result.getInt32("ROWKEY"), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
{
"name": "stream stream left join with rowkey - rekey",
"format": ["AVRO", "JSON"],
"enabled": false,
"comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
"CREATE STREAM TEST_STREAM (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}');",
Expand Down Expand Up @@ -89,6 +91,8 @@
{
"name": "stream stream left join - rekey",
"format": ["AVRO", "JSON"],
"enabled": false,
"comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
"CREATE STREAM TEST_STREAM (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}');",
Expand Down Expand Up @@ -169,6 +173,8 @@
},
{
"name": "stream stream left join - right join key in projection",
"enabled": false,
"comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');",
"CREATE STREAM TEST_STREAM (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='JSON');",
Expand Down Expand Up @@ -204,6 +210,8 @@
},
{
"name": "stream stream left join - both join keys in projection",
"enabled": false,
"comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done",
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='JSON');",
"CREATE STREAM TEST_STREAM (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='JSON');",
Expand Down Expand Up @@ -1283,6 +1291,8 @@
},
{
"name": "stream to table when neither have key field and joining by table ROWKEY",
"enabled": false,
"comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done",
"statements": [
"CREATE STREAM S (ID bigint) WITH (kafka_topic='S', value_format='JSON');",
"CREATE TABLE NO_KEY (ID bigint, NAME string) WITH (kafka_topic='NO_KEY', value_format='JSON');",
Expand Down

0 comments on commit 7addf88

Please sign in to comment.