Skip to content

Commit

Permalink
Explicit keys (#5533)
Browse files Browse the repository at this point in the history
* feat: explicit keys

implements: [KLIP-29](#5530)

fixes: #5303
fixes: #4678

This change sees ksqlDB no longer adding an implicit `ROWKEY STRING` key column to created streams or primary key column to created tables when no key column is explicitly provided in the `CREATE` statement.

BREAKING CHANGE

`CREATE TABLE` statements will now fail if not `PRIMARY KEY` column is provided.

For example, a statement such as:

```sql
CREATE TABLE FOO (name STRING) WITH (kafka_topic='foo', value_format='json');
```

Will need to be updated to include the definition of the PRIMARY KEY, e.g.

```sql
CREATE TABLE FOO (ID STRING PRIMARY KEY, name STRING) WITH (kafka_topic='foo', value_format='json');
```

If using schema inference, i.e. loading the value columns of the topic from the Schema Registry, the primary key can be provided as a partial schema, e.g.

```sql
-- FOO will have value columns loaded from the Schema Registry
CREATE TABLE FOO (ID INT PRIMARY KEY) WITH (kafka_topic='foo', value_format='avro');
```

`CREATE STREAM` statements that do not define a `KEY` column will no longer have an implicit `ROWKEY` key column.

For example:

```sql
CREATE STREAM BAR (NAME STRING) WITH (...);
```

The above statement would previously have resulted in a stream with two columns: `ROWKEY STRING KEY` and `NAME STRING`.
With this change the above statement will result in a stream with only the `NAME STRING` column.

Streams will no KEY column will be serialized to Kafka topics with a `null` key.


Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates committed Jun 3, 2020
1 parent 420ba80 commit d0db0cf
Show file tree
Hide file tree
Showing 283 changed files with 19,620 additions and 223 deletions.
2 changes: 1 addition & 1 deletion design-proposals/klip-29-explicit-keys.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Author**: @big-andy-coates |
**Release Target**: 0.10.0 |
**Status**: In Discussion |
**Status**: Merged |
**Discussion**: [Github PR](https://github.com/confluentinc/ksql/pull/5530)

**tl;dr:** Up until now ksqlDB has added an implicit `ROWKEY STRING (PRIMARY) KEY` to a `CREATE TABLE`
Expand Down
17 changes: 4 additions & 13 deletions docs/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,22 @@ Your output should resemble:
Name : PAGEVIEWS
Field | Type
--------------------------------------
ROWKEY | VARCHAR(STRING) (key)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

You may notice that ksqlDB has added a key column named `ROWKEY`.
This is the default key column that ksqlDB adds if you don't provide one.
If your data doesn't contain a {{ site.ak }} serialized
`STRING` in the {{ site.ak }} message key, don't use `ROWKEY` in your SQL statements,
because this may cause unexpected results.

### Create a Stream with a Specified Key

The previous SQL statement doesn't define a column to represent the data in the
{{ site.ak }} message key in the underlying {{ site.ak }} topic, so the system adds a
`ROWKEY` column with type `STRING`.

If the {{ site.ak }} message key is serialized in a key format that ksqlDB supports (currently `KAFKA`),
{{ site.ak }} message key in the underlying {{ site.ak }} topic. If the {{ site.ak }} message key
is serialized in a key format that ksqlDB supports (currently `KAFKA`),
you can specify the key in the column list of the CREATE STREAM statement.

For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the `viewtime`,
so you can write the CREATE STREAM statement like this:
For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the
`viewtime`, so you can write the CREATE STREAM statement like this:

```sql
CREATE STREAM pageviews_withkey
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ Each column is defined by:
If a column is not marked as a `KEY` column, ksqlDB loads it from the Kafka message's value.
Unlike a table's `PRIMARY KEY`, a stream's keys can be NULL.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every
ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a STREAM without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in Kafka is `AVRO` or `JSON`. See [Key Requirements](../syntax-reference.md#key-requirements) for more information. |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as a named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -83,10 +81,9 @@ Example
```sql
CREATE STREAM pageviews
(
rowkey BIGINT KEY,
page_id BIGINT KEY,
viewtime BIGINT,
user_id VARCHAR,
page_id VARCHAR
user_id VARCHAR
)
WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'my-pageviews-topic');
```
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ Each column is defined by:
`PRIMARY KEY` columns. If a column is not marked as a `PRIMARY KEY` column ksqlDB loads it
from the Kafka message's value. Unlike a stream's `KEY` column, a table's `PRIMARY KEY` column(s)
are NON NULL. Any records in the Kafka topic with NULL key columns are dropped.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every

ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a TABLE without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | **Optimization hint:** If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in kafka is `AVRO` or `JSON`. For more information, see [Key Requirements](../syntax-reference.md#key-requirements). |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in the Kafka Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the values schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -79,9 +77,8 @@ Example
```sql
CREATE TABLE users
(
rowkey BIGINT PRIMARY KEY,
id BIGINT PRIMARY KEY,
usertimestamp BIGINT,
user_id VARCHAR,
gender VARCHAR,
region_id VARCHAR
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ public CreateTableCommand createTableCommand(
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
if (schema.key().isEmpty()) {
final boolean usingSchemaInference = statement.getProperties().getSchemaId().isPresent();

final String additional = usingSchemaInference
? System.lineSeparator()
+ "Use a partial schema to define the primary key and still load the value columns from "
+ "the Schema Registry, for example:"
+ System.lineSeparator()
+ "\tCREATE TABLE " + statement.getName().text() + " (ID INT PRIMARY KEY) WITH (...);"
: "";

throw new KsqlException(
"Tables require a PRIMARY KEY. Please define the PRIMARY KEY." + additional
);
}

final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
Expand Down Expand Up @@ -153,7 +169,7 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
}
});

return tableElements.toLogicalSchema(true);
return tableElements.toLogicalSchema();
}

private static KsqlTopic buildTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public enum PlanJsonMapper {
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
new KsqlTypesDeserializationModule()
)
.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public Stream<ColumnName> resolveSelectStar(

@Override
void validateKeyPresent(final SourceName sinkName, final Projection projection) {
if (getSchema().key().isEmpty()) {
// No key column.
return;
}

final ColumnName keyName = Iterables.getOnlyElement(getSchema().key()).name();

if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private void registerForCreateSource(final ConfiguredStatement<? extends CreateS
// we can assume that the kafka topic is always present in the
// statement properties
registerSchema(
cs.getStatement().getElements().toLogicalSchema(false),
cs.getStatement().getElements().toLogicalSchema(),
cs.getStatement().getProperties().getKafkaTopic(),
cs.getStatement().getProperties().getFormatInfo(),
cs.getConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ boolean repartitionNotNeeded(final List<Expression> expressions) {
// Note: A repartition is only not required if partitioning by the existing key column, or
// the existing keyField.

if (schema.key().isEmpty()) {
// No current key, so repartition needed:
return false;
}

if (schema.key().size() != 1) {
throw new UnsupportedOperationException("logic only supports single key column");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class CommandFactoriesTest {
private static final TableElement ELEMENT1 =
tableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING));
private static final TableElements SOME_ELEMENTS = TableElements.of(ELEMENT1);
private static final TableElements ELEMENTS_WITH_PK = TableElements.of(
tableElement(Namespace.PRIMARY_KEY, "k", new Type(SqlTypes.STRING)),
ELEMENT1
);
private static final String TOPIC_NAME = "some topic";
private static final Map<String, Literal> MINIMIM_PROPS = ImmutableMap.of(
CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("JSON"),
Expand Down Expand Up @@ -325,7 +329,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi
);

final DdlStatement statement =
new CreateTable(SOME_NAME, SOME_ELEMENTS, true, withProperties);
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, true, withProperties);

// When:
final DdlCommand cmd = commandFactories
Expand Down

0 comments on commit d0db0cf

Please sign in to comment.