Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Sessionize funtion - Phase 2 #49

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 97 additions & 41 deletions src/ports/postgres/modules/utilities/sessionize.py_in
Expand Up @@ -17,16 +17,17 @@

import plpy
import string
import re

from control import MinWarning
from utilities import unique_string, _assert
from utilities import unique_string, _assert, split_quoted_delimited_str
from validate_args import get_cols
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid

m4_changequote(`<!', `!>')

def sessionize(schema_madlib, source_table, output_table, partition_expr,
time_stamp, max_time, **kwargs):
time_stamp, max_time, output_cols=None, create_view=None, **kwargs):
"""
Perform sessionization over a sequence of rows.

Expand All @@ -35,41 +36,74 @@ def sessionize(schema_madlib, source_table, output_table, partition_expr,
@param source_table: str, Name of the input table/view
@param output_table: str, Name of the table to store result
@param partition_expr: str, Expression to partition (group) the input data
@param time_stamp: str, Column name with time used for sessionization calculation
@param time_stamp: str, The time stamp column name that is used for sessionization calculation
@param max_time: interval, Delta time between subsequent events to define a session

@param output_cols: str, a valid postgres SELECT expression
@param create_view: boolean, indicates if the output is a view or a table with name
specified by output_table (default TRUE):
TRUE - create view
FALSE - materialize results into a table
"""
with MinWarning("error"):
_validate(source_table, output_table, partition_expr, time_stamp, max_time)

all_input_cols_str = ', '.join([i.strip() for i in get_cols(source_table, schema_madlib)])
session_id = 'session_id' if not is_var_valid(source_table, 'session_id') else unique_string('session_id')
table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE'
output_cols = '*' if output_cols is None else output_cols

# If the output_cols has '*' as one of the elements, expand it to
# include all columns in the source table. The following list
# comprehension is only to handle the case where '*' is included
# in output_cols. Using '*' as is, without expanding it to specific
# column names leads to some temporary intermediate columns
# (new_partition and new_session defined below) occurring in the output.
cols_to_project_list = [', '.join(get_cols(source_table, schema_madlib)) if i=='*' else i
for i in split_quoted_delimited_str(output_cols)]

# Examples of Invalid SELECT expression in output_cols:
# 1) If output_cols contains '*' along with an existing column name
# in the source table, postgres will throw an error and fail
# for specifying duplicate column names in the output table/view.
# 2) If output_cols contains more than 1 expressions which are not
# renamed using ' AS ', postgres will fail since it will try to
# rename all such new columns as '?column?'. This is considered an
# invalid SELECT expression.
cols_to_project = ', '.join(cols_to_project_list)

session_id = 'session_id' if not is_var_valid(source_table, 'session_id')\
else unique_string('session_id')

# Create temp column names for intermediate columns.
new_partition = unique_string('new_partition')
new_session = unique_string('new_session')

plpy.execute("""
CREATE TABLE {output_table} AS
SELECT
{all_input_cols_str},
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END)
OVER (PARTITION BY {partition_expr}
ORDER BY {time_stamp})
END AS {session_id}
FROM (
SELECT *,
ROW_NUMBER() OVER (w) = 1
AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp} - LAG({time_stamp}, 1)
OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table}
WINDOW w AS (PARTITION BY {partition_expr}
ORDER BY {time_stamp})
) a
""".format(**locals()))

try:
plpy.execute("""
CREATE {table_or_view} {output_table} AS
SELECT
{cols_to_project},
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END)
OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp})
END AS {session_id}
FROM (
SELECT *,
ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp})
) a
""".format(**locals()))
except plpy.SPIError as e:
# The specific exception we want to catch here is
# "spiexceptions.DuplicateColumn". But the current version of gpdb
# does not seem to have implemented it. So catching a more generic
# exception and displaying this warning message. The reason for
# doing this is that the default error message shown by postgres
# when we have more than one expressions in output_cols that do
# not use ' AS ' to rename them is not user-friendly.
with MinWarning("warning"):
plpy.warning("A plausible error condition: the output_cols\
parameter might be an invalid SELECT expression, resulting\
in duplicate column names.")
raise

def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
input_tbl_valid(source_table, 'Sessionization')
Expand All @@ -80,8 +114,7 @@ def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
_assert(max_time, "Sessionization error: Invalid max time value")
# ensure the partition/order expression can actually be used
_assert(is_var_valid(source_table, partition_expr, time_stamp),
"Sessionization error: invalid partition expression or time stamp column name")

"Sessionization error: Invalid partition expression or time stamp column name")

def sessionize_help_message(schema_madlib, message, **kwargs):
"""
Expand All @@ -94,17 +127,19 @@ def sessionize_help_message(schema_madlib, message, **kwargs):
Functionality: Sessionize

The goal of the MADlib sessionize function is to perform sessionization over
a time-series based data.
a time-series based data.

------------------------------------------------------------
USAGE
------------------------------------------------------------
SELECT {schema_madlib}.sessionize(
'source_table', -- str, Name of the table
'output_table', -- str, Table name to store the Sessionization results
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, Column name with time used for sessionization calculation
'max_time' -- str, Delta time between subsequent events to define a session
'source_table', -- str, Name of the table
'output_table', -- str, Table name to store the Sessionization results
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, The time stamp column name that is used for sessionization calculation
'max_time' -- str, Delta time between subsequent events to define a session
'output_cols' -- str, an optional valid postgres SELECT expression for the output table/view (default *)
'create_view' -- boolean, optional parameter to specify if output is a view or materilized to a table (default True)
);

------------------------------------------------------------
Expand Down Expand Up @@ -171,19 +206,40 @@ def sessionize_help_message(schema_madlib, message, **kwargs):
'04/15/2016 02:19:00'|103711|109|'WINE'|0|1
\.

- Sessionize the table for each user_id:
- Sessionize the table for each user_id, and obtain only the user_id, with partition expression,
event_timestamp and session_id:

SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'0:3:0' -- Events within a window of this time unit (180 seconds) must be in the same session
'0:3:0' -- Events within a window of this time unit (180 seconds) must be in the same session
);

- View the output table containing the session IDs:

SELECT * FROM sessionize_output;

DROP VIEW sessionize_output;

- Sessionize the table for each user_id, and materialize all columns from source table into an output table:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'180' -- Events within a window of this time unit (180 seconds) must be in the same session
'user_id, event_timestamp' -- Preseve only user_id and event_timestamp columns, along with the session id column
'false' -- Materialize results into a table, and not a view
);

- View the output table containing the session IDs:

SELECT eventlog.*, sessionize_output.session_id FROM eventlog INNER JOIN sessionize_output ON
(eventlog.user_id=sessionize_output.user_id AND eventlog.event_timestamp=sessionize_output.event_timestamp);

DROP TABLE sessionize_output;
"""

return help_string.format(schema_madlib=schema_madlib)