Skip to content

Commit

Permalink
feat: drop WITH(KEY) syntax (#5363)
Browse files Browse the repository at this point in the history
* feat: drop WITH(KEY) syntax

fixes: #3537

implements: See [KLIP-25](https://github.com/confluentinc/ksql/blob/master/design-proposals/klip-25-removal-of-with-key-syntax.md)

This change removes the `WITH(KEY)` syntax which previously allowed users to specify a value column that could act as an alias for the key column. This allowed a more user friendly name to be used for the key column, at the expense of requiring a copy of the key data in the value.

With the new 'any key name' feature, the key columns themselves can be given appropriate names, removing the need for this aliasing functionality.  See [KLIP-25](https://github.com/confluentinc/ksql/blob/master/design-proposals/klip-25-removal-of-with-key-syntax.md) for more details.

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates committed May 18, 2020
1 parent a56c54a commit bb43d23
Show file tree
Hide file tree
Showing 284 changed files with 17,000 additions and 4,034 deletions.
3 changes: 1 addition & 2 deletions docs/concepts/collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ in the `publications` stream are distributed over 3 partitions, are keyed on
the `author` column, and are serialized in the Avro format.

```sql
CREATE STREAM publications (author VARCHAR, title VARCHAR)
CREATE STREAM publications (author VARCHAR KEY, title VARCHAR)
WITH (kafka_topic = 'publication_events',
partitions = 3,
key = 'author',
value_format = 'avro');
```

Expand Down
3 changes: 1 addition & 2 deletions docs/concepts/collections/tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ partitions, are keyed on the `title` column, and are serialized in the Avro
format.

```sql
CREATE TABLE movies (title VARCHAR, release_year INT)
CREATE TABLE movies (title VARCHAR PRIMARY KEY, release_year INT)
WITH (kafka_topic = 'movies',
key = 'title'
partitions = 5,
value_format = 'avro');
```
Expand Down
22 changes: 12 additions & 10 deletions docs/concepts/stream-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ Deriving a new table from an existing stream
Given the following table and stream:

```sql
CREATE TABLE products (product_name VARCHAR, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json', key='product_name');
CREATE TABLE products (product_name VARCHAR PRIMARY KEY, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json');

CREATE STREAM orders (product_name VARCHAR)
WITH (kafka_topic='orders', partitions=1, value_format='json', key='product_name');
CREATE STREAM orders (product_name VARCHAR KEY)
WITH (kafka_topic='orders', partitions=1, value_format='json');
```

You can create an table that aggregates rows from the `orders` stream, while
You can create a table that aggregates rows from the `orders` stream, while
also joining the stream on the `products` table to enrich the `orders` data:

```sql
Expand Down Expand Up @@ -109,11 +109,11 @@ Deriving a new stream from multiple streams
Given the following two streams:

```sql
CREATE STREAM impressions (user VARCHAR, impression_id BIGINT, url VARCHAR)
WITH (kafka_topic='impressions', partitions=1, value_format='json', key='user');
CREATE STREAM impressions (user VARCHAR KEY, impression_id BIGINT, url VARCHAR)
WITH (kafka_topic='impressions', partitions=1, value_format='json');

CREATE STREAM clicks (user VARCHAR)
WITH (kafka_topic='clicks', partitions=1, value_format='json', key='user');
CREATE STREAM clicks (user VARCHAR KEY, url VARCHAR)
WITH (kafka_topic='clicks', partitions=1, value_format='json');
```

You can create a derived stream that joins the `impressions` and `clicks`
Expand All @@ -122,7 +122,9 @@ within one minute of the initial ad impression:

```sql
CREATE STREAM clicked_impressions AS
SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user EMIT CHANGES;
SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user
WHERE i.url = c.url
EMIT CHANGES;
```

Any time an `impressions` row is received, followed within one minute by a
Expand Down
76 changes: 35 additions & 41 deletions docs/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,34 @@ Name : PAGEVIEWS
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

You may notice that ksqlDB has added a key column named `ROWKEY`.
This is the default key column that ksqlDB adds if you don't provide one.
If your data doesn't contain a {{ site.ak }} serialized
`STRING` in the {{ site.ak }} message key, don't use `ROWKEY` in your SQL statements,
because this may cause unexpected results.

### Create a Stream with a Specified Key

The previous SQL statement makes no assumptions about the Kafka message
key in the underlying Kafka topic. If the value of the message key in
the topic is the same as one of the columns defined in the stream, you
can specify the key in the WITH clause of the CREATE STREAM statement.
If you use this column name later to perform a join or a repartition, ksqlDB
knows that no repartition is needed. In effect, the named column becomes an
alias for ROWKEY.
The previous SQL statement doesn't define a column to represent the data in the
{{ site.ak }} message key in the underlying {{ site.ak }} topic, so the system adds a
`ROWKEY` column with type `STRING`.

If the {{ site.ak }} message key is serialized in a key format that ksqlDB supports (currently `KAFKA`),
you can specify the key in the column list of the CREATE STREAM statement.

For example, if the Kafka message key has the same value as the `pageid`
column, you can write the CREATE STREAM statement like this:
For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the `viewtime`,
so you can write the CREATE STREAM statement like this:

```sql
CREATE STREAM pageviews_withkey
(viewtime BIGINT,
(viewtime BIGINT KEY,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid');
VALUE_FORMAT='DELIMITED');
```

Confirm that the KEY field in the new stream is `pageid` by using the
Confirm that the KEY column in the new stream is `pageid` by using the
DESCRIBE EXTENDED statement:

```sql
Expand All @@ -133,11 +137,17 @@ Your output should resemble:
```
Name : PAGEVIEWS_WITHKEY
Type : STREAM
Key field : PAGEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : DELIMITED
Kafka topic : pageviews (partitions: 1, replication: 1)
Field | Type
--------------------------------------
VIEWTIME | BIGINT (Key)
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
--------------------------------------
[...]
```

Expand All @@ -155,12 +165,11 @@ like this:

```sql
CREATE STREAM pageviews_timestamped
(viewtime BIGINT,
userid VARCHAR,
(viewtime BIGINT KEY,
userid VARCHAR
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid',
TIMESTAMP='viewtime')
```

Expand All @@ -176,9 +185,8 @@ Your output should resemble:
```
Name : PAGEVIEWS_TIMESTAMPED
Type : STREAM
Key field : PAGEID
Key format : STRING
Timestamp field : VIEWTIME
Key format : KAFKA
Value format : DELIMITED
Kafka topic : pageviews (partitions: 1, replication: 1)
[...]
Expand All @@ -197,7 +205,7 @@ the following CREATE STREAM statement into the CLI:

```sql
CREATE STREAM pageviews
(viewtime BIGINT,
(viewtime BIGINT KEY,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
Expand Down Expand Up @@ -242,12 +250,6 @@ them explicitly.
To stream the result of a SELECT query into an *existing* stream and its
underlying topic, use the INSERT INTO statement.

!!! note
The CREATE STREAM AS SELECT statement doesn't support the KEY property.
To specify a KEY field, use the PARTITION BY clause. For more
information, see
[Partition Data to Enable Joins](joins/partition-data.md).

The following SQL statement creates a `pageviews_intro` stream that
contains results from a persistent query that matches "introductory"
pages that have a `pageid` value that's less than `Page_20`:
Expand Down Expand Up @@ -280,12 +282,12 @@ Your output should resemble:
```
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 10/30/18 10:15:51 PM GMT, key: 294851, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 295051, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 295111, value: 1540937757265,User_8,Page_10
rowtime: 10/30/18 10:15:59 PM GMT, key: 295221, value: 1540937759330,User_4,Page_15
rowtime: 10/30/18 10:15:59 PM GMT, key: 295231, value: 1540937759699,User_1,Page_12
rowtime: 10/30/18 10:15:59 PM GMT, key: 295241, value: 1540937759990,User_6,Page_15
rowtime: 10/30/18 10:15:51 PM GMT, key: 1540937751186, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 1540937755255, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 1540937757265, value: 1540937757265,User_8,Page_10
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759330, value: 1540937759330,User_4,Page_15
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759699, value: 1540937759699,User_1,Page_12
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759990, value: 1540937759990,User_6,Page_15
^CTopic printing ceased
```

Expand All @@ -294,13 +296,6 @@ Press Ctrl+C to stop printing the stream.
!!! note
The query continues to run after you stop printing the stream.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

Use the SHOW QUERIES statement to view the query that ksqlDB created for
the `pageviews_intro` stream:

Expand Down Expand Up @@ -364,4 +359,3 @@ Next Steps
----------

- [Join Event Streams with ksqlDB](joins/join-streams-and-tables.md)

69 changes: 33 additions & 36 deletions docs/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ In the ksqlDB CLI, paste the following CREATE TABLE statement:

```sql
CREATE TABLE users
(registertime BIGINT,
userid VARCHAR,
(userid VARCHAR PRIMARY KEY,
registertime BIGINT,
gender VARCHAR,
regionid VARCHAR)
WITH (KAFKA_TOPIC = 'users',
VALUE_FORMAT='JSON',
KEY = 'userid');
VALUE_FORMAT='JSON');
```

Your output should resemble:
Expand Down Expand Up @@ -97,9 +96,8 @@ Your output should resemble:
Name : USERS
Field | Type
------------------------------------------
ROWKEY | VARCHAR(STRING) (key)
USERID | VARCHAR(STRING) (key)
REGISTERTIME | BIGINT
USERID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
------------------------------------------
Expand All @@ -116,13 +114,13 @@ SELECT * FROM users EMIT CHANGES;
Assuming the table has content, your output should resemble:

```
+--------+---------------+--------+--------+----------+
| ROWKEY | REGISTERTIME | USERID | GENDER | REGIONID |
+--------+---------------+--------+--------+----------+
| User_2 | 1498028899054 | User_2 | MALE | Region_1 |
| User_6 | 1505677113995 | User_6 | FEMALE | Region_7 |
| User_5 | 1491338621627 | User_5 | OTHER | Region_2 |
| User_9 | 1492621173463 | User_9 | FEMALE | Region_3 |
+--------+---------------+--------+----------+
| USERID | REGISTERTIME | GENDER | REGIONID |
+--------+---------------+--------+----------+
| User_2 | 1498028899054 | MALE | Region_1 |
| User_6 | 1505677113995 | FEMALE | Region_7 |
| User_5 | 1491338621627 | OTHER | Region_2 |
| User_9 | 1492621173463 | FEMALE | Region_3 |
^CQuery terminated
```

Expand All @@ -144,15 +142,14 @@ the following CREATE TABLE statement into the CLI:

```sql
CREATE TABLE users
(registertime BIGINT,
userid VARCHAR,
(userid VARCHAR PRIMARY KEY,
registertime BIGINT,
gender VARCHAR,
regionid VARCHAR)
WITH (KAFKA_TOPIC = 'users',
VALUE_FORMAT='JSON',
PARTITIONS=4,
REPLICAS=3
KEY = 'userid');
REPLICAS=3);
```

This will create the users topics for you with the supplied partition and replica count.
Expand Down Expand Up @@ -216,9 +213,9 @@ Your output should resemble:
```
Key format: KAFKA_STRING
Value format: JSON
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_9, value: {"USERID":"User_9","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"GENDER":"FEMALE","REGIONID":"Region_7"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_9, value: {"GENDER":"FEMALE","REGIONID":"Region_4"}
^CTopic printing ceased
```

Expand Down Expand Up @@ -282,17 +279,17 @@ SELECT ROWTIME, * FROM pageviews_table EMIT CHANGES;
Your output should resemble:

```
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 |
| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | User_1 | Page_79 | 1 |
| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | User_9 | Page_34 | 1 |
| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 2 |
| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | User_3 | Page_43 | 1 |
| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | User_1 | Page_29 | 1 |
| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | User_6 | Page_93 | 3 |
+---------------+---------------+---------------+------------------+------+
| ROWTIME | WINDOWSTART | WINDOWEND | KSQL_COL_0 | TOTAL|
+---------------+---------------+---------------+------------------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | 1 |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | 1 |
| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | 1 |
| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | 1 |
| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | 2 |
| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | 1 |
| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | 1 |
| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | 3 |
^CQuery terminated
```

Expand All @@ -304,16 +301,16 @@ Look up the value for a specific key within the table by using a SELECT
statement.

```sql
SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39';
SELECT * FROM pageviews_table WHERE KSQL_COL_0='User_9|+|Page_39';
```

Your output should resemble:

```
+------------------+---------------+---------------+--------+---------+-------+
| ROWKEY | WINDOWSTART | WINDOWEND | USERID | PAGEID | TOTAL |
+------------------+---------------+---------------+--------+---------+-------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | User_9 | Page_39 | 1 |
+------------------+---------------+---------------+--------+
| KSQL_COL_0 | WINDOWSTART | WINDOWEND | TOTAL |
+------------------+---------------+---------------+--------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | 1 |
Query terminated
```

Expand Down
Loading

0 comments on commit bb43d23

Please sign in to comment.