Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Sessionize funtion - Phase 2 #49

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 96 additions & 31 deletions src/ports/postgres/modules/utilities/sessionize.py_in
Expand Up @@ -17,16 +17,17 @@

import plpy
import string
import re

from control import MinWarning
from utilities import unique_string, _assert
from utilities import unique_string, _assert, split_quoted_delimited_str
from validate_args import get_cols
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid

m4_changequote(`<!', `!>')

def sessionize(schema_madlib, source_table, output_table, partition_expr,
time_stamp, max_time, **kwargs):
time_stamp, max_time, output_cols=None, create_view=None, **kwargs):
"""
Perform sessionization over a sequence of rows.

Expand All @@ -35,41 +36,83 @@ def sessionize(schema_madlib, source_table, output_table, partition_expr,
@param source_table: str, Name of the input table/view
@param output_table: str, Name of the table to store result
@param partition_expr: str, Expression to partition (group) the input data
@param time_stamp: str, Column name with time used for sessionization calculation
@param time_stamp: str, The time stamp column name that is used for sessionization calculation
@param max_time: interval, Delta time between subsequent events to define a session

@param output_cols: str, list of columns the output table/view must contain (default '*'):
* - all columns in the input table, and a new session ID column
'a,b,c,...' - a comma separated list of column names/expressions to be projected, along with a new session ID column
Copy link

@decibel decibel Jun 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to accept a generator if someone wanted to list column names? That certainly seems more pythonic than a string. The SQL equivalent would be passing in an array.

In the past, I have created a SQL function that would accept text that was either valid to cast directly to an array (ie: '{a,text,array}'), or simply a comma delimited list that would get turned into an array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we had precedent in other MADlib functions, and hence chose to go with the comma separated string to specify output_cols.
One potential modification could be to ask the user to specify a valid SELECT expression as output_cols, with the constraint that expressions must be renamed using AS. For instance, output_cols should be something like the following:
', "user id"<100 AS uid_100, revenue>20 AS rev_20',
instead of its current form which is:
'
, "user id"<100, revenue>20'

We will have to decide if this is too hard a constraint to have or not. Having this constraint will take away all the messy string parsing stuff we currently have implemented though.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to decide if this is too hard a constraint to have or not.
Having this constraint will take away all the messy string parsing stuff
we currently have implemented though.

Yeah, I didn't realize what the code was doing. I think it would be nice
to allow the user to choose different output names if they want.

Perhaps a good compromise would be to detect ' AS ' in the string and
then treat it as a raw select clause. Another option would be to treat
an array as a list of columns and anything else as a select clause
(which would also support the * case, I think).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processing strings will be error prone and adds a lot of complexity. Nevertheless, this might be
a good to have feature. I will leave this as is for Phase 2 and look at it in Phase 3.
Have added a comment in the Phase 3 JIRA (https://issues.apache.org/jira/browse/MADLIB-1002).

@param create_view: boolean, indicates if the output is a view or a table with name specified by output_table (default TRUE)
TRUE - create view
FALSE - materialize results into a table
"""
with MinWarning("error"):
_validate(source_table, output_table, partition_expr, time_stamp, max_time)
table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE'
output_cols_to_project = '*' if output_cols is None else output_cols

all_input_cols_str = ', '.join([i.strip() for i in get_cols(source_table, schema_madlib)])
cols_to_project = get_column_names(schema_madlib, source_table, output_cols_to_project)
session_id = 'session_id' if not is_var_valid(source_table, 'session_id') else unique_string('session_id')

# Create temp column names for intermediate columns.
new_partition = unique_string('new_partition')
new_session = unique_string('new_session')

plpy.execute("""
CREATE TABLE {output_table} AS
CREATE {table_or_view} {output_table} AS
SELECT
{all_input_cols_str},
{cols_to_project},
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END)
OVER (PARTITION BY {partition_expr}
ORDER BY {time_stamp})
END AS {session_id}
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END) OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp}) END AS {session_id}
FROM (
SELECT *,
ROW_NUMBER() OVER (w) = 1
AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp} - LAG({time_stamp}, 1)
OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table}
WINDOW w AS (PARTITION BY {partition_expr}
ORDER BY {time_stamp})
ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp})
) a
""".format(**locals()))

def get_column_names(schema_madlib, source_table, output_cols):
"""
This method creates a string that can be used in the SQL statement to project the columns specified in the output_cols parameter.

Return:
a string to be used in the SQL statement
"""
table_columns_list = get_cols(source_table, schema_madlib)
if output_cols.strip() == '*':
output_cols_str = get_cols_str(table_columns_list)
else:
output_cols_list, output_cols_names = get_columns_from_expression(output_cols, table_columns_list)
_validate_output_cols(source_table, output_cols_list)
output_cols_str = ', '.join([output_cols_names[i] if output_cols_list[i] == '*' else output_cols_list[i] + ' AS ' + output_cols_names[i]
for i in range(len(output_cols_list))])
return output_cols_str

def create_column_name_from_expression(col_name, table_columns_list):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this returns a list of columns, I think it would be better to call it something like column_names_from_expression

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename this and other functions too. It seems to be causing a lot of confusion with existing
utility functions used for parsing.

if col_name == '*':
return get_cols_str(table_columns_list)
else:
# Column name cannot have more than one pair of quotes in it. Removing any existing quotes, and then quoting the new string obtained.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to work. One immediate test case that comes to mind is "column with spaces in the name".

If you wanted to do something around quoting, I think you need to do everything that Postgres quote_ident() does.

How is this handled elsewhere in MADlib? What happens if you have "a table with spaces in the name"?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While looking at #47, I noticed that there are already column parsing utilities. See https://github.com/apache/incubator-madlib/pull/47/files#diff-96a0a5136d71ec812cc09b1d3a2e259aR35.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the comment exactly. This method does take care of column names with spaces in the name. Say I had a column name called "user id" (note that it is quoted, since postgres does not let me name a column with spaces without quotes), and another column called revenue.
If my output_cols parameter was:
' "user id"<100, revenue>20 '

I parse this to rename each expression:

  • "user id"<100 is named as "user id<100"
  • revenue>20 is named as "revenue>20"

This ends up creating a select statement as follows:
SELECT "user id"<100 AS "user id<100", revenue>20 AS "revenue>20"

All I am doing in this function is creating a new column name for an expression. I quote the expression to create a new column name, but before doing that, I remove any existing quotes from the expression (because postgres does not let me have quotes inside a quoted column name).

I checked again to make sure this is indeed how its working. I will put up some install checks to cover these cases too.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "user id"<100 is named as "user id<100"
  • revenue>20 is named as "revenue>20"

Ahh, ok. That was completely unclear to me. It would be good to add
comments/docstrings to the various functions explaining what they're doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.
I suggest also adding the above examples to the docstrings.

col_name = col_name.replace('"','')
col_name = '"'+col_name + '"'
if col_name in table_columns_list:
return unique_string(col_name)
else:
return col_name

def get_cols_str(table_columns_list):
return ', '.join([i for i in table_columns_list])

def get_columns_from_expression(output_cols, table_columns_list):
cols_list = split_quoted_delimited_str(output_cols)
cols_names = [i if i in table_columns_list else create_column_name_from_expression(i, table_columns_list) for i in cols_list]
return cols_list, cols_names

def _validate_output_cols(source_table, output_cols_list):
null_regex = re.compile('^null$', re.IGNORECASE)
for col in output_cols_list:
_assert(is_var_valid(source_table, col), "Sessionization error: Invalid output column name: " + col)
_assert(True if null_regex.search(col) is None else False, "Sessionization error: Output column name cannot be " + col)

def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
input_tbl_valid(source_table, 'Sessionization')
Expand All @@ -80,8 +123,7 @@ def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
_assert(max_time, "Sessionization error: Invalid max time value")
# ensure the partition/order expression can actually be used
_assert(is_var_valid(source_table, partition_expr, time_stamp),
"Sessionization error: invalid partition expression or time stamp column name")

"Sessionization error: Invalid partition expression or time stamp column name")

def sessionize_help_message(schema_madlib, message, **kwargs):
"""
Expand All @@ -94,17 +136,19 @@ def sessionize_help_message(schema_madlib, message, **kwargs):
Functionality: Sessionize

The goal of the MADlib sessionize function is to perform sessionization over
a time-series based data.
a time-series based data.

------------------------------------------------------------
USAGE
------------------------------------------------------------
SELECT {schema_madlib}.sessionize(
'source_table', -- str, Name of the table
'output_table', -- str, Table name to store the Sessionization results
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, Column name with time used for sessionization calculation
'max_time' -- str, Delta time between subsequent events to define a session
'source_table', -- str, Name of the table
'output_table', -- str, Table name to store the Sessionization results
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, The time stamp column name that is used for sessionization calculation
'max_time' -- str, Delta time between subsequent events to define a session
'output_cols' -- str, an optional comma separated list of columns to be projected in the output table/view (default *)
'create_view' -- boolean, optional parameter to specify if output is a view or materilized to a table (default True)
);

------------------------------------------------------------
Expand Down Expand Up @@ -171,19 +215,40 @@ def sessionize_help_message(schema_madlib, message, **kwargs):
'04/15/2016 02:19:00'|103711|109|'WINE'|0|1
\.

- Sessionize the table for each user_id:
- Sessionize the table for each user_id, and obtain only the user_id, with partition expression,
event_timestamp and session_id:

SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'0:3:0' -- Events within a window of this time unit (180 seconds) must be in the same session
'0:3:0' -- Events within a window of this time unit (180 seconds) must be in the same session
);

- View the output table containing the session IDs:

SELECT * FROM sessionize_output;

DROP VIEW sessionize_output;

- Sessionize the table for each user_id, and materialize all columns from source table into an output table:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'180' -- Events within a window of this time unit (180 seconds) must be in the same session
'user_id, event_timestamp' -- Preseve only user_id and event_timestamp columns, along with the session id column
'false' -- Materialize results into a table, and not a view
);

- View the output table containing the session IDs:

SELECT eventlog.*, sessionize_output.session_id FROM eventlog INNER JOIN sessionize_output ON
(eventlog.user_id=sessionize_output.user_id AND eventlog.event_timestamp=sessionize_output.event_timestamp);

DROP TABLE sessionize_output;
"""

return help_string.format(schema_madlib=schema_madlib)