New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to get window start or end time from kafka topic corresponding to KSQL windowed table? #1497

Closed
ChenShuai1981 opened this Issue Jun 26, 2018 · 14 comments

Comments

Projects
None yet
6 participants
@ChenShuai1981
Copy link

ChenShuai1981 commented Jun 26, 2018

ksql> select * from DEV_MONITOR_RULE_2557_104782_233_2_TABLE;

1530008581051 | 2557 : Window{start=1530008520000 end=-} | 2557 | 2 | 1530008581051 
./bin/kafka-console-consumer --zookeeper 10.12.0.157:2181 --topic DEV_MONITOR_RULE_2557_104782_233_2_TABLE

{"HITCOUNTS":2,"TENANTID":2557,"HITTIME":1530008581051} 

I want to get window start timestamp (here is 1530008520000) from kafka-console-consumer command. How to achieve it? Thanks!

Why there are special codes in the message key when consume and print? like 2557�d;���

 Table Name                                  | Kafka Topic                                 | Format | Windowed 
---------------------------------------------------------------------------------------------------------------
 DEV_MONITOR_RULE_2557_104782_233_2_TABLE    | DEV_MONITOR_RULE_2557_104782_233_2_TABLE    | JSON   | true  

ksql> select ROWKEY from DEV_MONITOR_RULE_2557_104782_233_2_TS_TABLE;
2557 : Window{start=1530008520000 end=-}   

I guess the actual message key is 2557 : Window{start=1530008520000 end=-}, right?

@apurvam

This comment has been minimized.

Copy link
Contributor

apurvam commented Jun 26, 2018

Hi @ChenShuai1981 , right now there is no way to extract the window start time from the message. Your analysis is correct: the Window information is in the key, encoded in binary format. By executing a SELECT ROWKEY it is converted to the string that you see.

What we need is to write a scalar function to parse out the start time from the row key. This should be fairly easy to implement. If such a function was implemented, you would be able to do SELECT WINDOWSTARTTIME() which would return the window start time as a long.

I'll mark this as an enhancement request.

@apurvam apurvam added the enhancement label Jun 26, 2018

@blueedgenick

This comment has been minimized.

Copy link
Contributor

blueedgenick commented Jun 26, 2018

Hey @apurvam - I thought the timestamp of the output message (the one containing the aggregate) got set to the window start time - is that no longer the case?

@apurvam

This comment has been minimized.

Copy link
Contributor

apurvam commented Jun 26, 2018

@blueedgenick , I think the timestamp of the output message is the timestamp of the message that generated the output. Not the window start time. Will cross check and report back.

@blueedgenick

This comment has been minimized.

Copy link
Contributor

blueedgenick commented Jun 27, 2018

Quick test with the 'ratings' dataset from the datagen tool, on 5.0.x:

create stream ratings ( rating_id bigint, user_id int, stars int, rating_time bigint, channel varchar, message varchar) with (kafka_topic = 'ratings', value_format = 'json');

create table windowtest as select user_id, count(message) as message_count from ratings window tumbling(size 30 seconds) group by user_id;

select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss.SSS') as ts, rowkey, user_id, message_count from windowtest where user_id = 2;

2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 1
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 3
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 4
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 5
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 6
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 1
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 2
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 3
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 4
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 2
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 3
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 4
^CQuery terminated

@rmoff

This comment has been minimized.

Copy link
Contributor

rmoff commented Jun 29, 2018

+1 to what @blueedgenick says, this is certainly what I've always observed and communicated externally. If this is changing please let us know :)

@apurvam

This comment has been minimized.

Copy link
Contributor

apurvam commented Jul 18, 2018

@hjafarpour can you help get to the bottom of this?

@hjafarpour

This comment has been minimized.

Copy link
Member

hjafarpour commented Jul 25, 2018

Currently the timestamp for the windowed table is set to the start of the window as @blueedgenick said.

@mjsax

This comment has been minimized.

Copy link
Member

mjsax commented Jul 25, 2018

The observation is correct, but this is not intended behavior but actually a bug in Kafka Streams -- we are going to fix this bug in Kafka Streams in the next release (and maybe some bug fix releases). You should not rely on the current behavior but extract the window start timestamp from the key.

@mjsax

This comment has been minimized.

Copy link
Member

mjsax commented Jul 26, 2018

Fixed in Apache Kafka trunk via: apache/kafka#5423

@rmoff

This comment has been minimized.

Copy link
Contributor

rmoff commented Jul 27, 2018

@mjsax what will the message timestamp become after this fix? Which version of CP will this be in

@apurvam @hjafarpour @blueedgenick currently there's no way to extract the timestamp from the key in KSQL, is that right? If so, then this move away (whether a 'bug' or not :) ) from the message timestamp being the window start time could break things. Several demos, including our own (e.g. https://github.com/confluentinc/quickstart-demos/blob/4.1.1-post/mysql-debezium/connector_elasticsearch.config, etc) use the org.apache.kafka.connect.transforms.InsertField$Value Single Message Transform to take the message timestamp to be able to store a calculated aggregate in a target datastore such as Elasticsearch. (/cc @ybyzek )

@mjsax

This comment has been minimized.

Copy link
Member

mjsax commented Jul 31, 2018

The PR from above is for AK 2.1 / CP 5.1 -- however, we also back-ported it to AK 2.0.1 / CP 5.0.1, AK 1.1.2 / CP 4.1.2, and older releases.

With the current fix, the timestamp will be the one of the last record that updated the aggregation result. Because processing order is not guaranteed, The timestamps are assigned non-deterministic and you cannot rely on them atm. Note, in AK 2.0 / CP 5.0 there is no official contract that provides any semantic guarantees for the result record timestamps.

We have WIP to change this, and use the maximum timestamp over all records that contributed to the result record. If we can finish this work it will be part of AK 2.1 / CP 5.1 (and we cannot back-port it). If this happens, we plan to make it a public contract and semantic guarantee and will also document it accordingly. It's not guaranteed, that we can finish the work on timestamp semantics for AK 2.1 / CP 5.1 though. Thus, as long as you don't see anything in the docs, assume that there is no public contract.

@mjsax

This comment has been minimized.

Copy link
Member

mjsax commented Jul 31, 2018

Even if you cannot extract the timestamp from the key in KSQL itself, a custom SMT should be able to extract the timestamp from the key using a TimeWindowedSerde or SessionWindowedSerde.

@apurvam

This comment has been minimized.

Copy link
Contributor

apurvam commented Jul 31, 2018

@apurvam @hjafarpour @blueedgenick currently there's no way to extract the timestamp from the key in KSQL, is that right?

Today, there isn't a way. However, we should easily be able to add a UDF for this. I filed #1674 to track the feature request.

@rmoff

This comment has been minimized.

Copy link
Contributor

rmoff commented Sep 3, 2018

closing to track in #1674

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment