💡 This example will show how to aggregate time-series data in real-time using a SESSION
window.
The source table (server_logs
) is backed by the faker
connector, which continuously generates rows in memory based on Java Faker expressions.
In a previous recipe, you learned about tumbling windows. Another way to group time-series data is using session windows, which aggregate records into sessions that represent periods of activity followed by gaps of idleness. Think, for example, of user sessions on a website: a user will be active for a given period of time, then leave the website; and each user will be active at different times. To analyze user behaviour, it's useful to aggregate their actions on the website for each period of activity (i.e. session).
Unlike tumbling windows, session windows don't have a fixed duration and are tracked independenlty across keys (i.e. windows of different keys will have different durations).
To count the number of "Forbidden" (403) requests per user over the duration of a session, you can use the SESSION
built-in group window function. In this example, a session is bounded by a gap of idleness of 10 seconds (INTERVAL '10' SECOND
). This means that requests that occur within 10 seconds of the last seen request for each user will be merged into the same session window; and any request that occurs outside of this gap will trigger the creation of a new session window.
Tip: You can use the
SESSION_START
andSESSION_ROWTIME
auxiliary functions to check the lower and upper bounds of session windows.
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '#{regexify ''(morsapaes|knauf|sjwiesman){1}''}',
'fields.log_time.expression' = '#{date.past ''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
SELECT
userid,
SESSION_START(log_time, INTERVAL '10' SECOND) AS session_beg,
SESSION_ROWTIME(log_time, INTERVAL '10' SECOND) AS session_end,
COUNT(request_line) AS request_cnt
FROM server_logs
WHERE status_code = '403'
GROUP BY
userid,
SESSION(log_time, INTERVAL '10' SECOND);