Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drop WITH(KEY) syntax #5363

Merged
merged 21 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a807f83
feat: drop WITH(KEY) syntax
big-andy-coates May 14, 2020
a9d5f1b
test: test updates
big-andy-coates May 14, 2020
283679a
test: historic plans
big-andy-coates May 14, 2020
d4c7d68
docs: doc updates
big-andy-coates May 14, 2020
3b95f87
Update docs/developer-guide/joins/partition-data.md
big-andy-coates May 14, 2020
6aa7f70
Update docs/developer-guide/joins/partition-data.md
big-andy-coates May 14, 2020
3a74a30
Update docs/developer-guide/joins/partition-data.md
big-andy-coates May 14, 2020
0e616ff
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
7c15340
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
adc8594
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
4a7322c
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
d2da5eb
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
7390a34
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
dbf9677
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
5acab89
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
058bfd3
Update docs/developer-guide/create-a-stream.md
big-andy-coates May 14, 2020
2559bc7
chore: merge from master
big-andy-coates May 15, 2020
10f1b9a
Merge branch 'remove_key_field' of github.com:big-andy-coates/ksql in…
big-andy-coates May 15, 2020
2b809e3
Merge branch 'master' into remove_key_field
big-andy-coates May 15, 2020
0362ad1
chore: merge from master
big-andy-coates May 18, 2020
ebc8053
chore: disable spotbugs red herring
big-andy-coates May 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/concepts/collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ in the `publications` stream are distributed over 3 partitions, are keyed on
the `author` column, and are serialized in the Avro format.

```sql
CREATE STREAM publications (author VARCHAR, title VARCHAR)
CREATE STREAM publications (author VARCHAR KEY, title VARCHAR)
WITH (kafka_topic = 'publication_events',
partitions = 3,
key = 'author',
value_format = 'avro');
```

Expand Down
3 changes: 1 addition & 2 deletions docs/concepts/collections/tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ partitions, are keyed on the `title` column, and are serialized in the Avro
format.

```sql
CREATE TABLE movies (title VARCHAR, release_year INT)
CREATE TABLE movies (title VARCHAR PRIMARY KEY, release_year INT)
WITH (kafka_topic = 'movies',
key = 'title'
partitions = 5,
value_format = 'avro');
```
Expand Down
22 changes: 12 additions & 10 deletions docs/concepts/stream-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ Deriving a new table from an existing stream
Given the following table and stream:

```sql
CREATE TABLE products (product_name VARCHAR, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json', key='product_name');
CREATE TABLE products (product_name VARCHAR PRIMARY KEY, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json');

CREATE STREAM orders (product_name VARCHAR)
WITH (kafka_topic='orders', partitions=1, value_format='json', key='product_name');
CREATE STREAM orders (product_name VARCHAR KEY)
WITH (kafka_topic='orders', partitions=1, value_format='json');
```

You can create an table that aggregates rows from the `orders` stream, while
You can create a table that aggregates rows from the `orders` stream, while
also joining the stream on the `products` table to enrich the `orders` data:

```sql
Expand Down Expand Up @@ -109,11 +109,11 @@ Deriving a new stream from multiple streams
Given the following two streams:

```sql
CREATE STREAM impressions (user VARCHAR, impression_id BIGINT, url VARCHAR)
WITH (kafka_topic='impressions', partitions=1, value_format='json', key='user');
CREATE STREAM impressions (user VARCHAR KEY, impression_id BIGINT, url VARCHAR)
WITH (kafka_topic='impressions', partitions=1, value_format='json');

CREATE STREAM clicks (user VARCHAR)
WITH (kafka_topic='clicks', partitions=1, value_format='json', key='user');
CREATE STREAM clicks (user VARCHAR KEY, url VARCHAR)
WITH (kafka_topic='clicks', partitions=1, value_format='json');
```

You can create a derived stream that joins the `impressions` and `clicks`
Expand All @@ -122,7 +122,9 @@ within one minute of the initial ad impression:

```sql
CREATE STREAM clicked_impressions AS
SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user EMIT CHANGES;
SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user
WHERE i.url = c.url
EMIT CHANGES;
```

Any time an `impressions` row is received, followed within one minute by a
Expand Down
76 changes: 35 additions & 41 deletions docs/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,34 @@ Name : PAGEVIEWS
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

You may notice that ksqlDB has added a key column named `ROWKEY`.
This is the default key column that ksqlDB adds if you don't provide one.
If your data doesn't contain a {{ site.ak }} serialized
`STRING` in the {{ site.ak }} message key, don't use `ROWKEY` in your SQL statements,
because this may cause unexpected results.

### Create a Stream with a Specified Key

The previous SQL statement makes no assumptions about the Kafka message
key in the underlying Kafka topic. If the value of the message key in
the topic is the same as one of the columns defined in the stream, you
can specify the key in the WITH clause of the CREATE STREAM statement.
If you use this column name later to perform a join or a repartition, ksqlDB
knows that no repartition is needed. In effect, the named column becomes an
alias for ROWKEY.
The previous SQL statement doesn't define a column to represent the data in the
{{ site.ak }} message key in the underlying {{ site.ak }} topic, so the system adds a
`ROWKEY` column with type `STRING`.

If the {{ site.ak }} message key is serialized in a key format that ksqlDB supports (currently `KAFKA`),
you can specify the key in the column list of the CREATE STREAM statement.

For example, if the Kafka message key has the same value as the `pageid`
column, you can write the CREATE STREAM statement like this:
For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the `viewtime`,
so you can write the CREATE STREAM statement like this:

```sql
CREATE STREAM pageviews_withkey
(viewtime BIGINT,
(viewtime BIGINT KEY,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid');
VALUE_FORMAT='DELIMITED');
```

Confirm that the KEY field in the new stream is `pageid` by using the
Confirm that the KEY column in the new stream is `pageid` by using the
DESCRIBE EXTENDED statement:

```sql
Expand All @@ -133,11 +137,17 @@ Your output should resemble:
```
Name : PAGEVIEWS_WITHKEY
Type : STREAM
Key field : PAGEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : DELIMITED
Kafka topic : pageviews (partitions: 1, replication: 1)

Field | Type
--------------------------------------
VIEWTIME | BIGINT (Key)
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
--------------------------------------
[...]
```

Expand All @@ -155,12 +165,11 @@ like this:

```sql
CREATE STREAM pageviews_timestamped
(viewtime BIGINT,
userid VARCHAR,
(viewtime BIGINT KEY,
userid VARCHAR
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid',
TIMESTAMP='viewtime')
```

Expand All @@ -176,9 +185,8 @@ Your output should resemble:
```
Name : PAGEVIEWS_TIMESTAMPED
Type : STREAM
Key field : PAGEID
Key format : STRING
Timestamp field : VIEWTIME
Key format : KAFKA
Value format : DELIMITED
Kafka topic : pageviews (partitions: 1, replication: 1)
[...]
Expand All @@ -197,7 +205,7 @@ the following CREATE STREAM statement into the CLI:

```sql
CREATE STREAM pageviews
(viewtime BIGINT,
(viewtime BIGINT KEY,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
Expand Down Expand Up @@ -242,12 +250,6 @@ them explicitly.
To stream the result of a SELECT query into an *existing* stream and its
underlying topic, use the INSERT INTO statement.

!!! note
The CREATE STREAM AS SELECT statement doesn't support the KEY property.
To specify a KEY field, use the PARTITION BY clause. For more
information, see
[Partition Data to Enable Joins](joins/partition-data.md).

The following SQL statement creates a `pageviews_intro` stream that
contains results from a persistent query that matches "introductory"
pages that have a `pageid` value that's less than `Page_20`:
Expand Down Expand Up @@ -280,12 +282,12 @@ Your output should resemble:
```
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 10/30/18 10:15:51 PM GMT, key: 294851, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 295051, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 295111, value: 1540937757265,User_8,Page_10
rowtime: 10/30/18 10:15:59 PM GMT, key: 295221, value: 1540937759330,User_4,Page_15
rowtime: 10/30/18 10:15:59 PM GMT, key: 295231, value: 1540937759699,User_1,Page_12
rowtime: 10/30/18 10:15:59 PM GMT, key: 295241, value: 1540937759990,User_6,Page_15
rowtime: 10/30/18 10:15:51 PM GMT, key: 1540937751186, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 1540937755255, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 1540937757265, value: 1540937757265,User_8,Page_10
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759330, value: 1540937759330,User_4,Page_15
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759699, value: 1540937759699,User_1,Page_12
rowtime: 10/30/18 10:15:59 PM GMT, key: 1540937759990, value: 1540937759990,User_6,Page_15
^CTopic printing ceased
```

Expand All @@ -294,13 +296,6 @@ Press Ctrl+C to stop printing the stream.
!!! note
The query continues to run after you stop printing the stream.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

Use the SHOW QUERIES statement to view the query that ksqlDB created for
the `pageviews_intro` stream:

Expand Down Expand Up @@ -364,4 +359,3 @@ Next Steps
----------

- [Join Event Streams with ksqlDB](joins/join-streams-and-tables.md)

69 changes: 33 additions & 36 deletions docs/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ In the ksqlDB CLI, paste the following CREATE TABLE statement:

```sql
CREATE TABLE users
(registertime BIGINT,
userid VARCHAR,
(userid VARCHAR PRIMARY KEY,
registertime BIGINT,
gender VARCHAR,
regionid VARCHAR)
WITH (KAFKA_TOPIC = 'users',
VALUE_FORMAT='JSON',
KEY = 'userid');
VALUE_FORMAT='JSON');
```

Your output should resemble:
Expand Down Expand Up @@ -97,9 +96,8 @@ Your output should resemble:
Name : USERS
Field | Type
------------------------------------------
ROWKEY | VARCHAR(STRING) (key)
USERID | VARCHAR(STRING) (key)
REGISTERTIME | BIGINT
USERID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
------------------------------------------
Expand All @@ -116,13 +114,13 @@ SELECT * FROM users EMIT CHANGES;
Assuming the table has content, your output should resemble:

```
+--------+---------------+--------+--------+----------+
| ROWKEY | REGISTERTIME | USERID | GENDER | REGIONID |
+--------+---------------+--------+--------+----------+
| User_2 | 1498028899054 | User_2 | MALE | Region_1 |
| User_6 | 1505677113995 | User_6 | FEMALE | Region_7 |
| User_5 | 1491338621627 | User_5 | OTHER | Region_2 |
| User_9 | 1492621173463 | User_9 | FEMALE | Region_3 |
+--------+---------------+--------+----------+
| USERID | REGISTERTIME | GENDER | REGIONID |
+--------+---------------+--------+----------+
| User_2 | 1498028899054 | MALE | Region_1 |
| User_6 | 1505677113995 | FEMALE | Region_7 |
| User_5 | 1491338621627 | OTHER | Region_2 |
| User_9 | 1492621173463 | FEMALE | Region_3 |
^CQuery terminated
```

Expand All @@ -144,15 +142,14 @@ the following CREATE TABLE statement into the CLI:

```sql
CREATE TABLE users
(registertime BIGINT,
userid VARCHAR,
(userid VARCHAR PRIMARY KEY,
registertime BIGINT,
gender VARCHAR,
regionid VARCHAR)
WITH (KAFKA_TOPIC = 'users',
VALUE_FORMAT='JSON',
PARTITIONS=4,
REPLICAS=3
KEY = 'userid');
REPLICAS=3);
```

This will create the users topics for you with the supplied partition and replica count.
Expand Down Expand Up @@ -216,9 +213,9 @@ Your output should resemble:
```
Key format: KAFKA_STRING
Value format: JSON
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_9, value: {"USERID":"User_9","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"GENDER":"FEMALE","REGIONID":"Region_7"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_9, value: {"GENDER":"FEMALE","REGIONID":"Region_4"}
^CTopic printing ceased
```

Expand Down Expand Up @@ -282,17 +279,17 @@ SELECT ROWTIME, * FROM pageviews_table EMIT CHANGES;
Your output should resemble:

```
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 |
| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | User_1 | Page_79 | 1 |
| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | User_9 | Page_34 | 1 |
| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 2 |
| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | User_3 | Page_43 | 1 |
| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | User_1 | Page_29 | 1 |
| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | User_6 | Page_93 | 3 |
+---------------+---------------+---------------+------------------+------+
| ROWTIME | WINDOWSTART | WINDOWEND | KSQL_COL_0 | TOTAL|
+---------------+---------------+---------------+------------------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | 1 |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | 1 |
| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | 1 |
| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | 1 |
| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | 2 |
| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | 1 |
| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | 1 |
| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | 3 |
^CQuery terminated
```

Expand All @@ -304,16 +301,16 @@ Look up the value for a specific key within the table by using a SELECT
statement.

```sql
SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39';
SELECT * FROM pageviews_table WHERE KSQL_COL_0='User_9|+|Page_39';
```

Your output should resemble:

```
+------------------+---------------+---------------+--------+---------+-------+
| ROWKEY | WINDOWSTART | WINDOWEND | USERID | PAGEID | TOTAL |
+------------------+---------------+---------------+--------+---------+-------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | User_9 | Page_39 | 1 |
+------------------+---------------+---------------+--------+
| KSQL_COL_0 | WINDOWSTART | WINDOWEND | TOTAL |
+------------------+---------------+---------------+--------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | 1 |
Query terminated
```

Expand Down
Loading