Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate Key On KSQL Table #7486

Closed
karno123 opened this issue May 10, 2021 · 8 comments
Closed

Duplicate Key On KSQL Table #7486

karno123 opened this issue May 10, 2021 · 8 comments
Assignees
Labels

Comments

@karno123
Copy link

karno123 commented May 10, 2021

Describe the bug
Found duplicate key on KSQL Table when do ingestion from oracle data source like attached.

To Reproduce
I did below step:

  1. Create kafka connector with

CREATE SOURCE CONNECTOR CONNECTOR_NAME WITH(
'connector.class'='io.confluent.connect.jdbc.JdbcSourceConnector',
'tasks.max'='1',
'connection.url'='',
'connection.user'='',
'connection.password'='',
'db.timezone'='UTC',
'database.serverTimezone'='UTC',
'key.converter'='io.confluent.connect.avro.AvroConverter',
'key.converter.schemas.enable'='false',
'key.converter.schema.registry.url'='http://localhost:8081',
'value.converter'='io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'='http://localhost:8081',
'value.converter.schemas.enable'='false',
'timestamp.delay.interval.ms'='1000',
'topic.prefix'='MFS.CPSMGT.',
'mode'='timestamp',
'table.whitelist'='CPSMGT.CPS_CUSTOMER_KYC',
'timestamp.column.name'='LOAD_DATA_TS',
'quote.sql.identifiers'='never',
'validate.non.null'=false,
'transforms'='cast',
'transforms.cast.type'='org.apache.kafka.connect.transforms.Cast$Value',
'transforms.cast.spec'='IDENTITYID:string'
);
2. Create stream
CREATE STREAM CPS_CUSTOMER_KYC_STREAM
WITH (
KAFKA_TOPIC = 'MFS.CPSMGT.CPS_CUSTOMER_KYC',
VALUE_FORMAT = 'AVRO',
TIMESTAMP = 'LOAD_DATA_TS'
);

3.Create stream to re-key topics
CREATE STREAM CPS_CUSTOMER_KYC_STREAM_REKEYED WITH (
KAFKA_TOPIC='CPS_CUSTOMER_KYC_STREAM_REKEYED',
PARTITIONS='8'
)
AS SELECT * FROM CPS_CUSTOMER_KYC_STREAM PARTITION BY IDENTITYID;
4. Create table
CREATE TABLE CPS_CUSTOMER_KYC_TABLE
(IDENTITYID STRING PRIMARY KEY)
WITH (
KAFKA_TOPIC = 'CPS_CUSTOMER_KYC_STREAM_REKEYED',
VALUE_FORMAT = 'AVRO',
TIMESTAMP = 'LOAD_DATA_TS'
);

Expected behavior
when query CPS_CUSTOMER_KYC_TABLE with specific id then query result should only return 1 rows

Actual behaviour
query to CPS_CUSTOMER_KYC_TABLE should return only one if filter by key table (attached pic)
Screen Shot 2021-05-10 at 14 24 29

Additional context
Add any other context about the problem here.

any body can help me with this problem?

@Harshith2396
Copy link

hey there @karno123 one thing you must remember about ksqldb is that its not a realtional db like oracle or nosql like mongodb. It does not store data uniquely. The whole concept of KSQLDB is a LOGGING DB meaning when you query the data without any aggregation you will see all the insertions. The Primary key works only if there is an aggregation on the data. in other words, when you query a table with 'EMIT CHANGES' you are actually querying the changelog to the table right now in your pic you have a query for changelog only !. To be able to see only one line you have materialze the data with some form of aggregation i.e, write a query with not having the two words "emit changes". For now remember that the primary key concept in KSQLDB and ORACLE DB are different the ksqldb will not support unique key value without any aggreagation. They say it will be added in future relase. this is a limitation i have discovered.
Hope this helps.

@karno123
Copy link
Author

karno123 commented May 10, 2021

Hi @Harshith2396 , thanks for replying.
Just curious, but I did with others table with same process. It look works like attached pic, I get stream with more than one row, but at table I just only get the latest one (I dont know if this is accidentally). If KSQL table is work like what you said, how do you deal with case (is KSQL will not work with this schenario ?):

  1. I have user table from data base published to kafka and user table maybe updated by user
  2. I have transaction streaming from other system
  3. I enrich the transaction data with customer table and publish to kafka topic for consumed by others system

Need your suggest 🙇

Screen Shot 2021-05-10 at 21 42 53

@PeterLindner
Copy link

From the docs for Tables:

Tables work by leveraging the keys of each row. If a sequence of rows shares a key, the last row for a given key represents the most up-to-date information for that key's identity.

Sounds like you may also want to read more about the types of queries ksqlDB supports. If you want to query ksqlDB like a traditional RDBMS check out PULL queries, but be sure to also read their limitations.

@mjsax
Copy link
Member

mjsax commented May 11, 2021

@karno123 Can we close this ticket (or re-label as "question" instead of "bug")? As mentioned above, when you use the EMIT CHANGES clause, you query the table's changelog stream. Thus, the query works as expected.

The underlying question now is: what do you really try to do and why is the observed behavior a problem?

@karno123
Copy link
Author

Hi @mjsax , oh oke you can change this thread to questions instead of bugs.

@karno123
Copy link
Author

Hi @PeterLindner , just simple question. If I create table like this (https://docs.confluent.io/5.4.2/ksql/docs/developer-guide/create-a-table.html)
CREATE TABLE users (registertime BIGINT, userid VARCHAR, gender VARCHAR, regionid VARCHAR) WITH (KAFKA_TOPIC = 'users', VALUE_FORMAT='JSON', KEY = 'userid');

if there is two duplicate row on users with same userid xxx , and then I do query with filter userid=xxx, is this query will return 2 rows? if yes, what actually the difference between table and stream? and why do we need to set key if the key it self did not guarantee the table content uniqueness?

@mjsax mjsax self-assigned this May 11, 2021
@mjsax
Copy link
Member

mjsax commented May 11, 2021

if there is two duplicate row on users with same userid xxx , and then I do query with filter userid=xxx, is this query will return 2 rows?

If the input contains two updates for user XXX, then the output topic will contain two update records, too. I would not say it's two "rows" though, but it's two updates for a single row.

what actually the difference between table and stream

There are many differences, because the semantics of both a totally different. A table allows for updates (no in-place updates, but semantically), while a stream is immutable and append-only.

For a filter on a stream for example, if you have two inputs <k,a> <k,null> <k,null> and you filter on k, you would get 3 outputs: <k,a> <k,null> <k,null> -- if you read the same data as table, the <k,null> are interpreted as deletes and thus the result would be <k,a> <k,null> (the third record would be an idempotent delete of a non-existing row, and thus not produce a redundant delete in the result).

Assume you have a count query with input <k, a> <k, b> <k, c> and you process the data as a stream, the output would be <k, 1> <k, 2> <k, 3> counting all records with key k. If you do the same but read the data as a table, you get <k, 1>, <k, 1>, <k, 1> because there is always just one row with key k -- <k,a> would be an "insert" into the empty table, and <k, b> semantically updates the row from a to b and thus the count of how many rows in the table have key k is still one after the update (similar for processing <k,c>).

Even if a table changelog might contain multiple records for the same key, it's updates to the same "row". Also note that for table changelogs, ksqlDB configures the topics with log compaction allowing older versions for each row to be garbage collected, but there is no retention time, and thus, the latest record per key won't ever be deleted. For a stream that are append only, keys are only used for partitioning but don't have any semantic meaning; the topic won't be configured with compaction but a retention time will be applied to guard agains unbounded growth.

@karno123
Copy link
Author

Hi @mjsax, thanks for explanation. Now I understand how table works in KSQL, and I already done some experiment with KSQL table, As far as I found some times table will contain multiple row by the same key. But after waiting for a moment the row on table with a certain key will keep only one. I guess this is because of "log compaction" process that remove old rows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants