Skip to content

Commit

Permalink
Explicit keys (#5533)
Browse files Browse the repository at this point in the history
* feat: explicit keys

implements: [KLIP-29](#5530)

fixes: #5303
fixes: #4678

This change sees ksqlDB no longer adding an implicit `ROWKEY STRING` key column to created streams or primary key column to created tables when no key column is explicitly provided in the `CREATE` statement.

BREAKING CHANGE

`CREATE TABLE` statements will now fail if not `PRIMARY KEY` column is provided.

For example, a statement such as:

```sql
CREATE TABLE FOO (name STRING) WITH (kafka_topic='foo', value_format='json');
```

Will need to be updated to include the definition of the PRIMARY KEY, e.g.

```sql
CREATE TABLE FOO (ID STRING PRIMARY KEY, name STRING) WITH (kafka_topic='foo', value_format='json');
```

If using schema inference, i.e. loading the value columns of the topic from the Schema Registry, the primary key can be provided as a partial schema, e.g.

```sql
-- FOO will have value columns loaded from the Schema Registry
CREATE TABLE FOO (ID INT PRIMARY KEY) WITH (kafka_topic='foo', value_format='avro');
```

`CREATE STREAM` statements that do not define a `KEY` column will no longer have an implicit `ROWKEY` key column.

For example:

```sql
CREATE STREAM BAR (NAME STRING) WITH (...);
```

The above statement would previously have resulted in a stream with two columns: `ROWKEY STRING KEY` and `NAME STRING`.
With this change the above statement will result in a stream with only the `NAME STRING` column.

Streams will no KEY column will be serialized to Kafka topics with a `null` key.

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
2 people authored and stevenpyzhang committed Jun 5, 2020
1 parent f4c8a66 commit c2f41b2
Show file tree
Hide file tree
Showing 283 changed files with 19,791 additions and 219 deletions.
162 changes: 162 additions & 0 deletions design-proposals/klip-29-explicit-keys.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# KLIP-29: Explicit Keys

**Author**: @big-andy-coates |
**Release Target**: 0.10.0 |
**Status**: Merged |
**Discussion**: [Github PR](https://github.com/confluentinc/ksql/pull/5530)

**tl;dr:** Up until now ksqlDB has added an implicit `ROWKEY STRING (PRIMARY) KEY` to a `CREATE TABLE`
or `CREATE STREAM` statement that does not explicitly define a key column. This KLIP proposes
removing this implicit column creation and instead requiring tables to define their PRIMARY KEY, and
changing `CREATE STREAM` to create a stream without a KEY column, should none be defined.

## Motivation and background

Implicitly adding columns is confusing for users. They are left wondering where did this column come from?
For example:

```
ksal> CREATE STREAM S (ID INT, NAME STRING) WITH (...);
Stream S Created.
ksal> CREATE STREAM S2 AS SELECT ID, NAME FROM S;
Key missing from projection. See https://cnfl.io/2LV7ouS.
The query used to build `S2` must include the key column ROWKEY in its projection.
```

Understandably, the user may be left wondering where this `ROWKEY` column came from and why they need to
add it to the projection.

Now that ksqlDB supports more than just `STRING` key columns, it no longer makes sense to add an
implicit `STRING` key column by default. Better to let the user define _if_ there is a key column, and if
so, what it's name and type are.

There is no semantic need for a STREAM to have data in the key, and hence there should be no need for
there to be a KEY column defined. This can also be useful when the data in the key is not in a format
ksqlDB can work with, e.g. an Avro, JSON or Protobuf key, or any other format. It seems unwieldy and
confusing to add an implicit `STRING` key column if the actual key is empty or is not something ksqlDB
can read.

Unlike STREAMs, TABLEs require a PRIMARY KEY column. Therefore, we should require the user to provide one.

## What is in scope

* `CREATE TABLE` statements will require a `PRIMARY KEY` column to be defined.
* `CREATE STREAM` statements without a `KEY` column defined will define a stream without a key column.

## What is not in scope

* Everything else.

## Value/Return

Removal of implicitly added columns makes statements more declarative. They make it possible to describe
a stream with no data, or incompatible data, in the key. They make users _think_ about the name and type
of the PRIMARY KEY column of their tables.

Combined, these changes make the language easier to use and richer.

## Public APIS

### CREATE TABLE changes

A `CREATE TABLE` statement that does not define a `PRIMARY KEY` column will now result in an error:

```
ksql> CREATE TABLE INPUT (A INT, B STRING) WITH (...);
Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
```

Statements with a `PRIMARY KEY` column will continue to work as before.

Where schema inference is used, the error message will include an example of how to define a partial
schema to add the primary key:

```sql
ksql> CREATE TABLE INPUT WITH (value_foramt='Avro', ...);
Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
You can define just the primary key and still load the value columns from the Schema registry, for example:
CREATE TABLE INPUT (ID INT PRIMARY KEY) WITH (value_foramt='Avro', ...);
```

### CREATE STREAM changes

A `CREATE STREAM` statement that does not define a `KEY` column will now result in a stream with no
data being read from the Kafka message's key.

```sql
CREATE STREAM INPUT (A INT, B STRING) WITH (...);
-- Results in INPUT with schema: A INT, B STRING, i.e. no key column.
```

Statements with a `KEY` column will continue to work as before.

## Design

This is mainly a syntax only change, as detailed about.

Streams without a key column will work just like any other stream. However, `GROUP BY`, `PARTITION BY`
and `JOIN`s will _always_ result in a repartition, as the grouping, partitioning or joining expression
can never be the key column.

Internally, ksqlDB and the Kafka Streams library it leverages, heavily leverages a key-value model.
Where a stream is created without a key column, internally ksqlDB will treat the key as a `Void` type,
and the key will always deserialize to `null`, regardless of the binary payload in the Kafka message's
key.

Likewise, when a row from a stream without a key is persisted to Kafka, the key will be serialized as
`null`.

## Test plan

Ensure coverage of key-less streams in QTT tests, especially for `GROUP BY`, `PARTITION BY`, and `JOIN`s.

## LOEs and Delivery Milestones

Single deliverable, with low loe (prototype already working).

## Documentation Updates

Suitable doc updates for the `CREATE TABLE` and `CREATE STREAM` statements will be done as part of the KLIP.

Plus updates to the rests of the ksqlDB docs, Kafka tutorials microsite and the Examples repo will be done
in tandem with other syntax changes.

Release notes to call out this change in behaviour.

## Compatibility Implications

CREATE statements submitted on previous versions of ksqlDB will continue to work as expected.

Users submitting previously written statements may see `CREATE TABLE` statements that previously ran,
now fail, and see `CREATE STREAM` statements create streams without a `ROWKEY STRING KEY` column.

Users receiving an error when their `CREATE TABLE` statements fail will need to update their statements
to include a suitable `PRIMARY KEY` column. Where the statement already contains the column set, the
addition of the `PRIMARY KEY` column should be simple. However, users may be more confused when the
statement is making use of schema inference, i.e. loading the value columns from the Schema Registry,
for example:

```sql
-- existing create statement that loads the value columns from the Schema Registry:
CREATE TABLE OUTPUT WITH (value_format='Avro', kafka_topic='input');
```

As ksqlDB does not _yet_ support loading the key schema from the Schema Registry the user must now
supply the `PRIMARY KEY` in a _partial schema_. (_Partial schema_ support was added in v0.9):

```sql
-- updated create statement that loads the value columns from the Schema Registry:
CREATE TABLE OUTPUT (ID INT PRIMARY KEY) WITH (value_format='Avro', kafka_topic='input');
```

The error message shown to users when the primary key is missing will include information about
partial schemas if the statement is using schema inference.

The change to `CREATE STREAM` is more subtle, as nothing will fail. However, users can either elect to
stick with the key-less stream, or add a suitable `KEY` column, depending on which best fits their
use-case.


## Security Implications

None.
17 changes: 4 additions & 13 deletions docs/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,22 @@ Your output should resemble:
Name : PAGEVIEWS
Field | Type
--------------------------------------
ROWKEY | VARCHAR(STRING) (key)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

You may notice that ksqlDB has added a key column named `ROWKEY`.
This is the default key column that ksqlDB adds if you don't provide one.
If your data doesn't contain a {{ site.ak }} serialized
`STRING` in the {{ site.ak }} message key, don't use `ROWKEY` in your SQL statements,
because this may cause unexpected results.

### Create a Stream with a Specified Key

The previous SQL statement doesn't define a column to represent the data in the
{{ site.ak }} message key in the underlying {{ site.ak }} topic, so the system adds a
`ROWKEY` column with type `STRING`.

If the {{ site.ak }} message key is serialized in a key format that ksqlDB supports (currently `KAFKA`),
{{ site.ak }} message key in the underlying {{ site.ak }} topic. If the {{ site.ak }} message key
is serialized in a key format that ksqlDB supports (currently `KAFKA`),
you can specify the key in the column list of the CREATE STREAM statement.

For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the `viewtime`,
so you can write the CREATE STREAM statement like this:
For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the
`viewtime`, so you can write the CREATE STREAM statement like this:

```sql
CREATE STREAM pageviews_withkey
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ Each column is defined by:
If a column is not marked as a `KEY` column, ksqlDB loads it from the Kafka message's value.
Unlike a table's `PRIMARY KEY`, a stream's keys can be NULL.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every
ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a STREAM without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in Kafka is `AVRO` or `JSON`. See [Key Requirements](../syntax-reference.md#key-requirements) for more information. |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as a named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -83,10 +81,9 @@ Example
```sql
CREATE STREAM pageviews
(
rowkey BIGINT KEY,
page_id BIGINT KEY,
viewtime BIGINT,
user_id VARCHAR,
page_id VARCHAR
user_id VARCHAR
)
WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'my-pageviews-topic');
```
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ Each column is defined by:
`PRIMARY KEY` columns. If a column is not marked as a `PRIMARY KEY` column ksqlDB loads it
from the Kafka message's value. Unlike a stream's `KEY` column, a table's `PRIMARY KEY` column(s)
are NON NULL. Any records in the Kafka topic with NULL key columns are dropped.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every

ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a TABLE without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | **Optimization hint:** If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in kafka is `AVRO` or `JSON`. For more information, see [Key Requirements](../syntax-reference.md#key-requirements). |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in the Kafka Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the values schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -79,9 +77,8 @@ Example
```sql
CREATE TABLE users
(
rowkey BIGINT PRIMARY KEY,
id BIGINT PRIMARY KEY,
usertimestamp BIGINT,
user_id VARCHAR,
gender VARCHAR,
region_id VARCHAR
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ public CreateTableCommand createTableCommand(
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
if (schema.key().isEmpty()) {
final boolean usingSchemaInference = statement.getProperties().getSchemaId().isPresent();

final String additional = usingSchemaInference
? System.lineSeparator()
+ "Use a partial schema to define the primary key and still load the value columns from "
+ "the Schema Registry, for example:"
+ System.lineSeparator()
+ "\tCREATE TABLE " + statement.getName().text() + " (ID INT PRIMARY KEY) WITH (...);"
: "";

throw new KsqlException(
"Tables require a PRIMARY KEY. Please define the PRIMARY KEY." + additional
);
}

final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
Expand Down Expand Up @@ -153,7 +169,7 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
}
});

return tableElements.toLogicalSchema(true);
return tableElements.toLogicalSchema();
}

private static KsqlTopic buildTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public enum PlanJsonMapper {
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
new KsqlTypesDeserializationModule()
)
.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public Stream<ColumnName> resolveSelectStar(

@Override
void validateKeyPresent(final SourceName sinkName, final Projection projection) {
if (getSchema().key().isEmpty()) {
// No key column.
return;
}

final ColumnName keyName = Iterables.getOnlyElement(getSchema().key()).name();

if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private void registerForCreateSource(final ConfiguredStatement<? extends CreateS
// we can assume that the kafka topic is always present in the
// statement properties
registerSchema(
cs.getStatement().getElements().toLogicalSchema(false),
cs.getStatement().getElements().toLogicalSchema(),
cs.getStatement().getProperties().getKafkaTopic(),
cs.getStatement().getProperties().getFormatInfo(),
cs.getConfig(),
Expand Down
Loading

0 comments on commit c2f41b2

Please sign in to comment.