New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL joins will fail silently if a table's topic message key does not match the declared KSQL Table key #749

Open
rmoff opened this Issue Feb 16, 2018 · 6 comments

Comments

Projects
None yet
6 participants
@rmoff
Contributor

rmoff commented Feb 16, 2018

KSQL supports joining streams to tables. However, for this to work, the table's underlying kafka topic must have as a key the column on which the join is made. Currently KSQL silently fails to make a join in which it is non-obvious to the user (particularly one from the Database world and familiar with SQL) why it doesn't work.

Consider a simple stream/table (event/reference, a.k.a. fact/dimension) join:

  • RENTAL is a stream of rental events, with various foreign key relationships including a CUSTOMER_ID
  • CUSTOMER is a table of customer information, with a primary key of CUSTOMER_ID

The data in this example comes from MySQL, connected into Kafka using Debezium.

MySQL:

  • The event data:

      mysql> SELECT R.RENTAL_ID, R.RENTAL_DATE, R.CUSTOMER_ID FROM RENTAL R WHERE R.CUSTOMER_ID=603;
      +-----------+---------------------+-------------+
      | RENTAL_ID | RENTAL_DATE         | CUSTOMER_ID |
      +-----------+---------------------+-------------+
      |     16050 | 2018-02-16 18:31:55 |         603 |
      |     16051 | 2018-02-16 18:34:37 |         603 |
      |     16052 | 2018-02-16 18:35:01 |         603 |
      |     16053 | 2018-02-16 18:35:07 |         603 |
      +-----------+---------------------+-------------+
      4 rows in set (0.00 sec)
    
  • The reference data:

      mysql> SELECT C.FIRST_NAME,C.LAST_NAME,C.CUSTOMER_ID FROM CUSTOMER C WHERE C.CUSTOMER_ID=603;
      +------------+-----------+-------------+
      | FIRST_NAME | LAST_NAME | CUSTOMER_ID |
      +------------+-----------+-------------+
      | BOB        | Astley    |         603 |
      +------------+-----------+-------------+
      1 row in set (0.00 sec)
    
  • The executed join in MySQL:

      mysql> SELECT R.RENTAL_ID, R.RENTAL_DATE, R.CUSTOMER_ID, C.FIRST_NAME, C.LAST_NAME FROM RENTAL R LEFT OUTER JOIN CUSTOMER C ON R.CUSTOMER_ID=C.CUSTOMER_ID WHERE C.FIRST_NAME IS NOT NULL AND R.CUSTOMER_ID=603;
      +-----------+---------------------+-------------+------------+-----------+
      | Rental_ID | RENTAL_DATE         | CUSTOMER_ID | FIRST_NAME | LAST_NAME |
      +-----------+---------------------+-------------+------------+-----------+
      |     16050 | 2018-02-16 18:31:55 |         603 | BOB        | Astley    |
      |     16051 | 2018-02-16 18:34:37 |         603 | BOB        | Astley    |
      |     16052 | 2018-02-16 18:35:01 |         603 | BOB        | Astley    |
      |     16053 | 2018-02-16 18:35:07 |         603 | BOB        | Astley    |
      +-----------+---------------------+-------------+------------+-----------+
      4 rows in set (0.00 sec)
    

Now the same in KSQL:

  • Stream:

      ksql> CREATE STREAM RENTAL WITH (KAFKA_TOPIC='fullfillment.sakila.rental-flat', VALUE_FORMAT='AVRO');
    
       Message
      ----------------
       Stream created
      ----------------
    
      ksql> SELECT R.RENTAL_ID, R.RENTAL_DATE, R.CUSTOMER_ID FROM RENTAL R WHERE R.CUSTOMER_ID=603;
      16050 | 1518805915000 | 603
      16051 | 1518806077000 | 603
      16052 | 1518806101000 | 603
      16053 | 1518806107000 | 603
    
  • Table:

      ksql> CREATE TABLE CUSTOMER WITH (KAFKA_TOPIC='fullfillment.sakila.customer-flat',VALUE_FORMAT='AVRO',KEY='customer_id');
    
       Message
      ---------------
       Table created
       ---------------
    
      ksql> SELECT C.FIRST_NAME,C.LAST_NAME,C.CUSTOMER_ID FROM CUSTOMER C WHERE C.CUSTOMER_ID=603;
      BOB | Astley | 603
    
  • Join:

      ksql> SELECT R.RENTAL_ID, R.RENTAL_DATE, R.CUSTOMER_ID, C.FIRST_NAME, C.LAST_NAME FROM RENTAL R LEFT OUTER JOIN CUSTOMER C ON R.CUSTOMER_ID=C.CUSTOMER_ID WHERE C.FIRST_NAME IS NOT NULL AND R.CUSTOMER_ID=603;
      [... no output ...]
    

Here is the problem. The key that we declared for the table (KEY='customer_id') does not match the key for the Kafka message:

ksql> SELECT ROWKEY,CUSTOMER_ID FROM CUSTOMER WHERE CUSTOMER_ID=603;
�        | 603

Examining the underlying Kafka topic:

$ kafkacat -C -K: -b localhost:9092 -f 'Key:    %k\nKey Bytes: %K\nValue:  %s\nValue Bytes: %S\n\n' -t fullfillment.sakila.customer-flat

Key:    �
Key Bytes: 7
Value:  Y�      BOBAstley����X22018-02-16T18:47:58+01:00
Value Bytes: 54

Same data, Avro deserialised:

$ ./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--topic fullfillment.sakila.customer-flat \
--from-beginning

{"customer_id":603}     {"customer_id":603,"store_id":1,"first_name":"BOB","last_name":"Astley","email":null,"address_id":1,"active":1,"create_date":1518805799000,"last_update":"2018-02-16T18:47:58+01:00"}

So technically KSQL is evaluating the join correctly, but in practice this is going to suck for the end user, particularly one who is not familiar with Kafka's key/value message structure.

The workaround is to manually rekey the topic:

ksql> CREATE STREAM CUST_RAW_STREAM WITH (KAFKA_TOPIC='fullfillment.sakila.customer-flat', VALUE_FORMAT='AVRO');

Message
----------------
Stream created
----------------

ksql> CREATE STREAM CUSTOMER_REKEYED AS SELECT * FROM CUST_RAW_STREAM PARTITION BY CUSTOMER_ID;

Message
----------------------------
Stream created and running
----------------------------

ksql> CREATE TABLE customer WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', VALUE_FORMAT='avro', KEY='CUSTOMER_ID');

Message
---------------
Table created
---------------

The resulting topic is keyed correctly (i.e. the key is the CUSTOMER_ID):

$ kafkacat -C -K: -b localhost:9092 -f 'Key:    %k\nKey Bytes: %K\nValue:  %s\nValue Bytes: %S\n\n' -t CUSTOMER_REKEYED
Key:    603
Key Bytes: 3
Value:  ~�      BOBAstley����X22018-02-16T18:47:58+01:00
Value Bytes: 62

Now in the KSQL table the ROWKEY matches CUSTOMER_ID:

ksql> SELECT ROWKEY,CUSTOMER_ID FROM CUSTOMER WHERE CUSTOMER_ID=603;
603 | 603

and the desired join succeeds:

ksql> SELECT R.RENTAL_ID, R.RENTAL_DATE, R.CUSTOMER_ID, C.FIRST_NAME, C.LAST_NAME FROM RENTAL R LEFT OUTER JOIN CUSTOMER C ON R.CUSTOMER_ID=C.CUSTOMER_ID WHERE C.FIRST_NAME IS NOT NULL;
16050 | 1518805915000 | 603 | BOB | Astley
16051 | 1518806077000 | 603 | BOB | Astley
16052 | 1518806101000 | 603 | BOB | Astley
16053 | 1518806107000 | 603 | BOB | Astley

How do we make this less painful for the user? Several ideas:

  1. Some kind of rekey operation that takes a CREATE TABLE declaration and explicitly rekeys the source topic (i.e. implements the above workaround).
    This could be done:

    • automatically (best UX for new users, but overkill if already correctly keyed)
      • with performance optimisation to automagically skip the rekey if the topic was already keyed on the declared table key.
      • and/or with explicit 'NOREKEY' syntax
    • on demand e.g. with a REKEY option in the CREATE TABLE declaration (requires users to know to look for the option)
  2. Evaluate a sample of messages and warn the user if the message key doesn't match the declared table key

Something that would also help reduce instances of this -but not avoid the problem entirely- would be to support different Key formats (in this case, the Key is the declared CUSTOMER_ID, but is serialised as Avro not String that KSQL currently assumes)


Interestingly, a side-effect of KSQL only using STRING Keys is that the message on the derived topic cannot be read using avro-console-consumer if print.key=true:

Robin@asgard02 ~/c/confluent-4.0.0> ./bin/kafka-avro-console-consumer \
                                    --bootstrap-server localhost:9092 \
                                    --property schema.registry.url=http://localhost:8081 \
                                    --property print.key=true --topic CUSTOMER_REKEYED --from-beginning
Processed a total of 1 messages
[2018-02-16 19:11:53,976] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
@apurvam

This comment has been minimized.

Contributor

apurvam commented Feb 27, 2018

Related: #804

@vahidhashemian

This comment has been minimized.

vahidhashemian commented Mar 12, 2018

@rmoff, thanks for explaining the issue. I came across this while trying to find a solution for this:
I have the following stream and table:
stream s(a)
table t(b, a) - where b is the group by key
When I try to left join the two on a I get null rows for the table.
Am I hitting this issue? Is this a limitation on KSQL or Kafka Streams?

@pavel-agarkov

This comment has been minimized.

pavel-agarkov commented Aug 8, 2018

Does this mean that joins though the reference tables (N to N) are impossible in KSQL?
For example if I have a separate table like UserRoles (UserId, RoleId)
Probably I can repartition this table like this:

CREATE STREAM UserRoles_Repartitioned AS
SELECT UserId + '-' + RoleId AS Id, UserId, RoleId
FROM UserRoles_FromDebezium
PARTITION BY Id;

and then use it to create table UserRoles and use in joins like this:

CREATE STREAM UserRoles_ReadModel AS
SELECT u.UserName, o.RoleName, ...
FROM Users AS u
JOIN Roles AS o ON 1=1
JOIN UserRoles ur ON ur.Id = u.Id + '-' + r.Id;

Is this the way to go?

EDIT: changed customer/order to user/role since it is more realistic example...

@rmoff

This comment has been minimized.

Contributor

rmoff commented Aug 8, 2018

@pavel-agarkov You can't have more than one JOIN in a query, but you can create a daisy-chained set of queries that would have the same nett effect. If you've got more questions please open a new issue dedicated to this, or head over to #ksql on http://cnfl.io/slack.

@karthikeyanrd27

This comment has been minimized.

karthikeyanrd27 commented Sep 21, 2018

Where condition is not working properly in KSQL .

ksql> select * from TBL_PLN_PRO_DIV_SDIV;
1537547964489 | 658 | U | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 382591 | UNVSO Hanes 17/18 UF NBC Universo (382446)-V2 | 382446 | null | null | null | -1 | 10009 | 101 | 658 | 25-JUN-18 | 26-JUN-18 | 02-APR-18 | 01-JUL-18 | 1170001 | National | 1599001 | 14831 | 17989 | 3087 | 3085 | Bravo | NBC News | SNF

ksql> SELECT * FROM TBL_MS_TARGET_GROUP;
1537554188748 | 2476 | NBC_APPS.TBL_MS_TARGET_GROUP | I | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 2476 | P25-54 | 5090 | 108+109+110+111+112+113+9+10+20+21+22+23 | 1577001 | 1694001 | 30-AUG-18

Join is working properly

ksql> SELECT A.PRIMARY_DEMO_ID,B.DEMO_ID FROM TBL_PLN_PRO_DIV_SDIV A LEFT JOIN TBL_MS_TARGET_GROUP B ON (A.PRIMARY_DEMO_ID=B.DEMO_ID);
3085 | null

But where condition is not working Left join .

ksql> SELECT * FROM TBL_MS_TARGET_GROUP WHERE OP_TYPE = 'I';
1537554188748 | 2476 | NBC_APPS.TBL_MS_TARGET_GROUP | I | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 2476 | P25-54 | 5090 | 108+109+110+111+112+113+9+10+20+21+22+23 | 1577001 | 1694001 | 30-AUG-18

ksql> SELECT A.PRIMARY_DEMO_ID,B.DEMO_ID FROM TBL_PLN_PRO_DIV_SDIV A LEFT JOIN TBL_MS_TARGET_GROUP B ON (A.PRIMARY_DEMO_ID=B.DEMO_ID) where B.OP_TYPE = 'I';
No response.

ksql> SELECT A.PRIMARY_DEMO_ID,B.DEMO_ID FROM TBL_PLN_PRO_DIV_SDIV A LEFT JOIN TBL_MS_TARGET_GROUP B ON (A.PRIMARY_DEMO_ID=B.DEMO_ID);

3085 | null

@rmoff

This comment has been minimized.

Contributor

rmoff commented Sep 24, 2018

@karthikeyanrd27 please open a new issue with details, and you can reference this one to link it if you think they are related. It makes it easier to track and debug specific problems. When you raise the issue, please can you include your schema (SELECT * doesn't tell me which columns there are, and one of the key things in JOINs is the key, which isn't clear here).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment