Skip to content

Conversation

@chaozwn
Copy link
Member

@chaozwn chaozwn commented Mar 17, 2020

提供sql如下
CREATE TABLE datahub_stream (
card_id VARCHAR,
location VARCHAR,
mytime timestamp,
action VARCHAR,
WATERMARK FOR mytime AS withOffset(mytime,1000)
) WITH (
type='kafka11',
bootstrapServers='hadoop102:9092,hadoop103:9092,hadoop104:9092',
zookeeperQuorum='hadoop102:2181,hadoop103:2181,hadoop104:2181',
offsetReset='latest',
groupId='csin_stream',
topic='csin_source',
parallelism='1',
timezone='Asia/Shanghai'
);

CREATE TABLE rds_out (
start_timestamp timestamp,
end_timestamp timestamp,
card_id VARCHAR,
event VARCHAR
) WITH (
type='kafka11',
bootstrapServers='hadoop102:9092,hadoop103:9092,hadoop104:9092',
topic='csin_source2',
parallelism='1',
timezone='Asia/Shanghai'
);

insert into rds_out
select
start_timestamp,
end_timestamp,
card_id,event
from datahub_stream
MATCH_RECOGNIZE (
PARTITION BY card_id
ORDER BY mytime
MEASURES
e2.action as event,
e1.mytime as start_timestamp,
LAST(e2.mytime) as end_timestamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (e1 e2) WITHIN INTERVAL '10' MINUTE
DEFINE
e1 as e1.action = 'Tom',
e2 as e2.action = 'Tom' and e2.location <> e1.location
);

@yangsishu yangsishu merged commit ae52175 into DTStack:1.8_release Mar 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants