Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream created with 'PARTITION BY CAST(...' cannot be inserted into #5053

Closed
terryf82 opened this issue Apr 11, 2020 · 6 comments
Closed

Stream created with 'PARTITION BY CAST(...' cannot be inserted into #5053

terryf82 opened this issue Apr 11, 2020 · 6 comments
Assignees
Labels
Milestone

Comments

@terryf82
Copy link

When creating a stream, if you can partition by a value that exists exactly as required in the source stream (PARTITION BY x) and run DESCRIBE EXTENDED against that stream, you'll see x defined as the ROWKEY. This stream can now have additional streams of data folded into it using INSERT INTO.. provided they have a conforming schema.

If you create a stream and need to partition by a re-casted value from the source stream (PARTITION BY CAST(x AS STRING)) and run DESCRIBE EXTENDED, the stream does not have a defined / named ROWKEY. For joining purposes, this doesn't prevent the stream from being joined to another stream, using the defined key.

However you are prevented from running INSERT INTO... this second kind of stream, even if the additional source of data has a conforming schema:

ksql> CREATE STREAM KS_ALL_CATEGORIES WITH (VALUE_FORMAT='AVRO', PARTITIONS=6) AS SELECT T1.OP AS OP, CAST(T1.RECORD->PRODUCT_ID AS STRING) AS PRODUCT_ID, T2.NAME AS CATEGORY_NAME, 'api' AS SOURCE FROM KS_CATEGORYLINKS_KEYED T1 JOIN KT_CATEGORIES T2 ON T1.CATEGORY_ID = T2.ID PARTITION BY CAST(T1.RECORD->PRODUCT_ID AS STRING);

ksql> DESCRIBE EXTENDED KS_ALL_CATEGORIES;

Name                 : KS_ALL_CATEGORIES
Type                 : STREAM
Key field            :
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
...

and the stream to insert into it -

ksql> DESCRIBE KS_PUBLISHINGCATEGORIES;

Name                 : KS_PUBLISHINGCATEGORIES
 Field         | Type
-------------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 OP            | VARCHAR(STRING)
 PRODUCT_ID    | VARCHAR(STRING)
 CATEGORY_NAME | VARCHAR(STRING)
 SOURCE        | VARCHAR(STRING)
-------------------------------------------

ksql> INSERT INTO KS_ALL_CATEGORIES SELECT OP, PRODUCT_ID, CATEGORY_NAME, SOURCE FROM KS_PUBLISHINGCATEGORIES PARTITION BY PRODUCT_ID;
Incompatible key fields for sink and results. Sink key field is null (type: null) while result key field is PRODUCT_ID (type: STRING)

But, if KS_ALL_CATEGORIES is created in a way that it does have an explicitly defined ROWKEY -

CREATE STREAM KS_API_CATEGORIES WITH (VALUE_FORMAT='AVRO', PARTITIONS=6) AS SELECT T1.OP AS OP, CAST(T1.RECORD->PRODUCT_ID AS STRING) AS PRODUCT_ID, T2.NAME AS CATEGORY_NAME, 'api' AS SOURCE FROM KS_CATEGORYLINKS_KEYED T1 JOIN KT_GB_CATEGORIES T2 ON T1.CATEGORY_ID = T2.ID PARTITION BY CAST(T1.RECORD->PRODUCT_ID AS STRING);

CREATE STREAM KS_ALL_CATEGORIES WITH (VALUE_FORMAT='AVRO', PARTITIONS=6) AS SELECT * FROM KS_API_CATEGORIES PARTITION BY PRODUCT_ID;

ksql> DESCRIBE EXTENDED KS_ALL_CATEGORIES;

Name                 : KS_ALL_CATEGORIES
Type                 : STREAM
Key field            : PRODUCT_ID
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
...

it can now be inserted into -

ksql> INSERT INTO KS_ALL_CATEGORIES SELECT OP, PRODUCT_ID, CATEGORY_NAME, SOURCE FROM KS_PUBLISHINGCATEGORIES PARTITION BY PRODUCT_ID;
 Message
---------------------------------------
 Created query with ID InsertQuery_131
---------------------------------------

Using ksqlDB 0.8

@terryf82 terryf82 added the bug label Apr 11, 2020
@terryf82
Copy link
Author

cc @agavra this was the issue I mentioned to you via Slack.

@apurvam apurvam added this to the 0.9.0 milestone Apr 13, 2020
@apurvam
Copy link
Contributor

apurvam commented Apr 13, 2020

cc @big-andy-coates who recently changed the partition by semantics and has the most fresh context. This seems like a bug in evaluating expressions in the PARTITION BY clause.

@apurvam
Copy link
Contributor

apurvam commented Apr 14, 2020

This may help #4952

@big-andy-coates
Copy link
Contributor

Ack

@spena spena modified the milestones: 0.9.0, 0.10.0 Apr 27, 2020
@big-andy-coates
Copy link
Contributor

tl;dr: this will be fixed in master once #5363 is merged, hence closing the issue.

So the issue here is around the checking of the 'key field' of the source the INSERT INTO is targetting. The 'key field' a column in the value that is a duplicate of the data stored in the key.

The INSERT INTO is failing because of a mismatch between the 'key field' in target stream and the query. TBH, I'm not even sure why INSERT INTO would be checking the key field for compatibility. It should only care that the schemas match. So, yes, this is a bug in 0.8, and likely 0.9. However...

This will be fixed in master / 0.10.0 as soon as #5363 is merged, as this PR completely removes the concept of a 'key field'.

@panasenco
Copy link

For anyone reading this who's still using 0.9.0, another work-around is to cast the partitioning field as string, even if it's already a string: PARTITION BY CAST(product_id AS STRING).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants