Permalink
Cannot retrieve contributors at this time
37 lines (30 sloc)
1.91 KB
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
demo-scene/twitter-streams-operator/demo.ksql
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SET 'auto.offset.reset' = 'earliest'; | |
-- creates a stream from tweets (and topic) | |
CREATE STREAM twitter_raw ( \ | |
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \ | |
WITH (KAFKA_TOPIC='twitter_json_01', partitions=12, VALUE_FORMAT='JSON'); | |
-- create stream for particular hashtag | |
CREATE STREAM twitter AS \ | |
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\ | |
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \ | |
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \ | |
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \ | |
EXTRACTJSONFIELD(user,'$.Description') AS user_Description, \ | |
Text,hashtagentities,lang \ | |
FROM twitter_raw WHERE LCASE(hashtagentities) like '%kafkasummit%'; | |
-- continuous query | |
SELECT USER_NAME, TEXT FROM TWITTER WHERE LCASE(TEXT) LIKE '%gamussa%'; | |
CREATE TABLE user_tweet_count AS \ | |
SELECT user_screenname, count(*) AS tweet_count \ | |
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \ | |
WHERE LCASE(TEXT) LIKE '%gamussa%' \ | |
GROUP BY user_screenname ; | |
CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \ | |
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\ | |
USER_SCREENNAME, \ | |
TWEET_COUNT FROM user_tweet_count; | |
create table more_than_3_tweets_kafkasummit as \ | |
SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \ | |
FROM USER_TWEET_COUNT_DISPLAY \ | |
WHERE TWEET_COUNT > 3; | |
select USER_SCREENNAME, TWEET_COUNT from more_than_3_tweets_kafkasummit; |