Skip to content

Commit

Permalink
Docs for primitive key support (#4478)
Browse files Browse the repository at this point in the history
* feat: primitive key support

ksqlDB now supports the following primitive key types: `INT`, `BIGINT`, `DOUBLE` as well as the existing `STRING` type.

The key type can be defined in the CREATE TABLE or CREATE STREAM statement by including a column definition for `ROWKEY` in the form `ROWKEY <primitive-key-type> KEY,`, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT KEY, NAME STRING, RATING DOUBLE) WITH (kafka_topic='users', VALUE_FORMAT='json');
```

ksqlDB currently requires the name of the key column to be `ROWKEY`. Support for arbitrary key names is tracked by #3536.

ksqlDB currently requires keys to use the `KAFKA` format. Support for additional formats is tracked by https://github.com/confluentinc/ksql/projects/3.

Schema inference currently only works with `STRING` keys, Support for additional key types is tracked by #4462. (Schema inference is where ksqlDB infers the schema of a CREATE TABLE and CREATE STREAM statements from the schema registered in the Schema Registry, as opposed to the user supplying the set of columns in the statement).

Apache Kafka Connect can be configured to output keys in the `KAFKA` format by using a Converter, e.g. `"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"`. Details of which converter to use for which key type can be found here: https://docs.confluent.io/current/ksql/docs/developer-guide/serialization.html#kafka in the `Connect Converter` column.

@rmoff has written an introductory blog about primitive keys: https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/

BREAKING CHANGE: existing queries that perform a PARTITION BY or GROUP BY on a single column of one of the above supported primitive key types will now set the key to the appropriate type, not a `STRING` as previously.
  • Loading branch information
big-andy-coates committed Feb 7, 2020
1 parent 2cd6b37 commit ddf09d7
Show file tree
Hide file tree
Showing 21 changed files with 1,297 additions and 714 deletions.
3 changes: 2 additions & 1 deletion docs-md/concepts/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ processor. Usually, an event is called a "row", as if it were a row in a
relational database. Each row is composed of a series of columns. Most columns
represent fields in the value of an event, but there are a few extra columns.
In particular, there are the `ROWKEY` and `ROWTIME` columns that represent the
key and time of the event. These columns are present on every row.
key and time of the event. These system columns are present on every row.
In addition, windowed sources have `WINDOWSTART` and `WINDOWEND` columns.

Page last revised on: {{ git_revision_date }}
6 changes: 4 additions & 2 deletions docs-md/concepts/stream-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ collection by using the `SELECT` statement on an existing collection. The
result of the inner `SELECT` feeds into the outer declared collection. You
don't need to declare a schema when deriving a new collection, because ksqlDB
infers the column names and types from the inner `SELECT` statement. The
`ROWKEY` and `ROWTIME` fields of each row remain, unless you override them in
the `SELECT` statement.
`ROWKEY` of the row remains the same, unless the query includes either a
`PARTITION BY` or `GROUP BY` clause. The value of the `ROWTIME` column
sets the timestamp of the record written to {{ site.ak }}. The value of system columns
can not be set in the `SELECT`.

Here are a few examples of deriving between the different collection types.

Expand Down
3 changes: 3 additions & 0 deletions docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ data format is `DELIMITED`. Other options are `Avro`, `JSON` and `KAFKA`.
See [Serialization Formats](serialization.md#serialization-formats) for more
details.

ksqlDB requires keys to have been serialized using {{ site.ak }}'s own serializers or compatible
serializers. ksqlDB supports `INT`, `BIGINT`, `DOUBLE`, and `STRING` key types.

In the ksqlDB CLI, paste the following CREATE STREAM statement:

```sql
Expand Down
3 changes: 3 additions & 0 deletions docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ data format is `JSON`. Other options are `Avro`, `DELIMITED` and `KAFKA`. For
more information, see
[Serialization Formats](serialization.md#serialization-formats).

ksqlDB requires keys to have been serialized using {{ site.ak }}'s own serializers or compatible
serializers. ksqlDB supports `INT`, `BIGINT`, `DOUBLE`, and `STRING` key types.

In the ksqlDB CLI, paste the following CREATE TABLE statement:

```sql
Expand Down
104 changes: 58 additions & 46 deletions docs-md/developer-guide/joins/partition-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,70 @@ records based on the joining column. To ensure that records with the same
join column are co-located on the same stream task, the join column must
coincide with the column that the sources are partitioned by.

Primary key
-----------
Keys
----

A *primary key*, when present, defines the partitioning column. Tables are
A *key*, when present, defines the partitioning column. Tables are
always partitioned by their primary key, and ksqlDB doesn't allow repartitioning
of tables, so you can only use a table's primary key as a join column.

Streams, on the other hand, may not have a defined key or may have a key that
differs from the join column. In these cases, ksqlDB repartitions the stream,
which implicitly defines a primary key for it. The primary keys for streams
and tables are of data type `VARCHAR`.
differs from the join column. In these cases, ksqlDB internally repartitions
the stream, which implicitly defines a key for it.

For primary keys to match, they must have the same serialization format. For
example, you can't join a `VARCHAR` key encoded as JSON with one encoded as
AVRO.

!!! note
ksqlDB requires that keys are encoded as UTF-8 strings.
ksqlDB requires keys to use the `KAFKA` format. For more information, see
[Serialization Formats](serialization.md#serialization-formats). If internally
repartitioning, ksqlDB uses the correct format.

Because you can only use the primary key of a table as a joining column, it's
important to understand how keys are defined. For both streams and tables, the
column that represents the primary key has the name `ROWKEY`.
column that represents the key has the name `ROWKEY`.

When you create a table by using a CREATE TABLE statement, the key of the
table is the same as that of the records in the underlying Kafka topic.
You must set the type of the `ROWKEY` column in the
CREATE TABLE statement to match the key data in the underlying {{ site.ak }} topic.

When you create a table by using a CREATE TABLE AS SELECT statement, the key of
the resulting table is determined as follows:

- If the FROM clause is a single source, and the source is a stream, the
statement must have a GROUP BY clause, where the grouping columns determine
the key of the resulting table.
- If the single source is a table, the key is copied over from the key of the
table in the FROM clause. If the FROM clause is a join, the primary key of the
resulting table is the joining column, since joins are allowed only on keys.
- If the statement contains a GROUP BY, the key of the resulting table
comprises the grouping columns.

When the primary key consists of multiple columns, like when it's created as
the result of a GROUP BY clause with multiple grouping columns, you must use
ROWKEY as the joining column. Even when the primary key consists of a single
column, we recommend using ROWKEY as the joining column to avoid confusion.

The following example shows a `users` table joined with a `clicks` stream
- If the FROM clause contains a stream, the statement must have a GROUP BY clause,
and the grouping columns determine the key of the resulting table.
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is a `STRING`.
- If the FROM clause contains only tables and no GROUP BY clause, the key is
copied over from the key of the table(s) in the FROM clause.
- If the FROM clause contains only tables and has a GROUP BY clause, the
grouping columns determine the key of the resulting table.
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is a `STRING`.

The following example shows a `users` table joined with a `clicks` stream
on the `userId` column. The `users` table has the correct primary key
`userId` that coincides with the joining column. But the `clicks` stream
doesn't have a defined key, and ksqlDB must repartition it on the joining
column (`userId`) and assign the primary key before performing the join.
column (`userId`) and assign the key before performing the join.

```sql
-- clicks stream, with an unknown key.
-- the schema of stream clicks is: ROWTIME | ROWKEY | USERID | URL
CREATE STREAM clicks (userId STRING, url STRING) WITH(kafka_topic='clickstream', value_format='json');
-- the schema of stream clicks is: ROWTIME BIGINT | ROWKEY STRING | USERID BIGINT | URL STRING
CREATE STREAM clicks (userId BIGINT, url STRING) WITH(kafka_topic='clickstream', value_format='json');

-- the primary key of table users becomes userId that is the key of the records topic:
-- the schema of table users is: ROWTIME | ROWKEY | USERID | URL
CREATE TABLE users (userId STRING, fullName STRING) WITH(kafka_topic='users', value_format='json', key='userId');
-- the primary key of table users is a BIGINT.
-- The userId column in the value matches the key, so can be used as an alias for ROWKEY in queries to make them more readable.
-- the schema of table users is: ROWTIME BIGINT | ROWKEY BIGINT | USERID BIGINT | FULLNAME STRING
CREATE TABLE users (ROWKEY BIGINT KEY, userId BIGINT, fullName STRING) WITH(kafka_topic='users', value_format='json', key='userId');

-- join using primary key of table users with newly assigned key of stream clicks
-- join of users table with clicks stream, joining on the table's primary key alias and the stream's userId column:
-- join will automatically repartition clicks stream:
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.ROWKEY = users.ROWKEY;
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.userId = users.userId;

-- The following is equivalent and does not rely on their being a copy of the tables key within the value schema:
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.userId = users.ROWKEY;
```

Co-partitioning Requirements
Expand All @@ -87,7 +90,7 @@ and tables are *co-partitioned*, which means that input records on both sides
of the join have the same configuration settings for partitions.

- The input records for the join must have the
[same keying scheme](#records-have-the-same-keying-scheme).
[same keying schema](#records-have-the-same-keying-schema).
- The input records must have the
[same number of partitions](#records-have-the-same-number-of-partitions)
on both sides.
Expand All @@ -98,26 +101,35 @@ When your inputs are co-partitioned, records with the same key, from
both sides of the join, are delivered to the same stream task during
processing.

### Records Have the Same Keying Scheme
### Records Have the Same Keying Schema

For a join to work, the keys from both sides must have the same serialized
binary data.
For a join to work, the keys from both sides must have the same SQL type.

For example, you can join a stream of user clicks that's keyed on a `VARCHAR`
user id with a table of user profiles that's also keyed on a `VARCHAR` user id.
Records with the exact same user id on both sides will be joined.

ksqlDB requires that keys are UTF-8 encoded strings.
If the schema of the columns you wish to join on don't match, it may be possible
to `CAST` one side to match the other. For example, if one side of the join
had a `INT` userId column, and the other a `LONG`, then you may choose to cast
the `INT` side to a `LONG`:

```sql
-- stream with INT userId
CREATE STREAM clicks (userId INT, url STRING) WITH(kafka_topic='clickstream', value_format='json');

-- table with BIGINT userId stored in they key:
CREATE TABLE users (ROWKEY BIGINT KEY, fullName STRING) WITH(kafka_topic='users', value_format='json');

-- Join utilising a CAST to convert the left sides join column to match the rights type.
SELECT clicks.url, users.fullName FROM clicks JOIN users ON CAST(clicks.userId AS BIGINT) = users.ROWKEY;
```

!!! note
A join depends on the key's underlying serialization format. For example,
no join occurs on a VARCHAR column that's encoded as JSON with a VARCHAR
column that's encoded as AVRO.

Tables created on top of existing Kafka topics, for example those created with
a `CREATE TABLE` statement, are keyed on the data held in the key of the records
in the Kafka topic. ksqlDB presents this data in the `ROWKEY` column and expects
the data to be a `VARCHAR`.
the data to be in the `KAFKA` format.

Tables created inside ksqlDB from other sources, for example those created with
a `CREATE TABLE AS SELECT` statement, will copy the key from their source(s)
Expand Down
8 changes: 2 additions & 6 deletions docs-md/developer-guide/ksqldb-reference/print.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,14 @@ Synopsis
--------

```sql
PRINT qualifiedName [FROM BEGINNING] [INTERVAL interval] [LIMIT limit]
PRINT topicName [FROM BEGINNING] [INTERVAL interval] [LIMIT limit]
```

Description
-----------

Print the contents of Kafka topics to the ksqlDB CLI.

!!! important
SQL grammar defaults to uppercase formatting. You can use quotations
(`"`) to print topics that contain lowercase characters.

The PRINT statement supports the following properties:

| Property | Description |
Expand All @@ -40,7 +36,7 @@ The following statement shows how to print all of the records in a topic named
`ksql__commands`.

```sql
PRINT 'ksql__commands' FROM BEGINNING;
PRINT ksql__commands FROM BEGINNING;
```

Your output should resemble:
Expand Down
16 changes: 10 additions & 6 deletions docs-md/developer-guide/ksqldb-reference/select-push-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ In the previous statements, `from_item` is one of the following:
- `from_item LEFT JOIN from_item ON join_condition`

The WHERE clause can refer to any column defined for a stream or table,
including the two implicit columns `ROWTIME` and `ROWKEY`.
including the system columns `ROWTIME` and `ROWKEY`.

Example
-------
Expand Down Expand Up @@ -109,8 +109,12 @@ SET 'auto.offset.reset' = 'earliest';

The WINDOW clause lets you control how to group input records *that have
the same key* into so-called *windows* for operations like aggregations
or joins. Windows are tracked per record key. ksqlDB supports the following
WINDOW types.
or joins. Windows are tracked per record key.

Windowing adds two additional system columns to the data, which provide
the window bounds: `WINDOWSTART` and `WINDOWEND`.

KsqlDB supports the following WINDOW types.

**TUMBLING**: Tumbling windows group input records into fixed-sized,
non-overlapping windows based on the records' timestamps. You must
Expand All @@ -122,7 +126,7 @@ The following statement shows how to create a push query that has a tumbling
window.

```sql
SELECT item_id, SUM(quantity)
SELECT windowstart, windowend, item_id, SUM(quantity)
FROM orders
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY item_id
Expand All @@ -138,7 +142,7 @@ The following statement shows how to create a push query that has a hopping
window.

```sql
SELECT item_id, SUM(quantity)
SELECT windowstart, windowend, item_id, SUM(quantity)
FROM orders
WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS)
GROUP BY item_id
Expand All @@ -157,7 +161,7 @@ The following statement shows how to create a push query that has a session
window.

```sql
SELECT item_id, SUM(quantity)
SELECT windowstart, windowend, item_id, SUM(quantity)
FROM orders
WINDOW SESSION (20 SECONDS)
GROUP BY item_id
Expand Down
2 changes: 2 additions & 0 deletions docs-md/developer-guide/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The primary mechanism is by choosing the serialization format when you
create a stream or table and specify the `VALUE_FORMAT` in the `WITH`
clause.

While ksqlDB supports different value formats, it requires keys to be `KAFKA` format.

```sql
CREATE TABLE ORDERS (F0 INT, F1 STRING) WITH (VALUE_FORMAT='JSON', ...);
```
Expand Down
Loading

0 comments on commit ddf09d7

Please sign in to comment.