-
Notifications
You must be signed in to change notification settings - Fork 145
/
sessionize.py_in
242 lines (216 loc) · 12.5 KB
/
sessionize.py_in
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
import string
import re
from control import MinWarning
from utilities import unique_string, _assert, split_quoted_delimited_str
from validate_args import get_cols
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid
m4_changequote(`<!', `!>')
def sessionize(schema_madlib, source_table, output_table, partition_expr,
time_stamp, max_time, output_cols=None, create_view=None, **kwargs):
"""
Perform sessionization over a sequence of rows.
Args:
@param schema_madlib: str, Name of the MADlib schema
@param source_table: str, Name of the input table/view
@param output_table: str, Name of the table to store result
@param partition_expr: str, Expression to partition (group) the input data
@param time_stamp: str, The time stamp column name that is used for sessionization calculation
@param max_time: interval, Delta time between subsequent events to define a session
@param output_cols: str, a valid postgres SELECT expression
@param create_view: boolean, indicates if the output is a view or a table with name specified by output_table (default TRUE)
TRUE - create view
FALSE - materialize results into a table
"""
with MinWarning("error"):
_validate(source_table, output_table, partition_expr, time_stamp, max_time)
table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE'
output_cols = '*' if output_cols is None else output_cols
# If the output_cols has '*' as one of the elements, expand it to
# include all columns in the source table. The following list
# comprehension is only to handle the case where '*' is included
# in output_cols. Using '*' as is, without expanding it to specific
# column names leads to some temporary intermediate columns
# (new_partition and new_session defined below) occurring in the output.
cols_to_project_list = [', '.join(get_cols(source_table, schema_madlib)) if i=='*' else i
for i in split_quoted_delimited_str(output_cols)]
# Examples of Invalid SELECT expression in output_cols:
# 1) If output_cols contains '*' along with an existing column name
# in the source table, postgres will throw an error and fail
# for specifying duplicate column names in the output table/view.
# 2) If output_cols contains more than 1 expressions which are not
# renamed using ' AS ', postgres will fail since it will try to
# rename all such new columns as '?column?'. This is considered an
# invalid SELECT expression.
cols_to_project = ', '.join(cols_to_project_list)
session_id = 'session_id' if not is_var_valid(source_table, 'session_id')\
else unique_string('session_id')
# Create temp column names for intermediate columns.
new_partition = unique_string('new_partition')
new_session = unique_string('new_session')
try:
plpy.execute("""
CREATE {table_or_view} {output_table} AS
SELECT
{cols_to_project},
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END) OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp}) END AS {session_id}
FROM (
SELECT *,
ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp})
) a
""".format(**locals()))
except plpy.SPIError as e:
# The specific exception we want to catch here is
# "spiexceptions.DuplicateColumn". But the current version of gpdb
# does not seem to have implemented it. So catching a more generic
# exception and displaying this warning message. The reason for
# doing this is that the default error message shown by postgres
# when we have more than one expressions in output_cols that do
# not use ' AS ' to rename them is not user-friendly.
with MinWarning("warning"):
plpy.warning("A plausible error condition: the output_cols\
parameter might be an invalid SELECT expression, resulting\
in duplicate column names.")
raise
def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
input_tbl_valid(source_table, 'Sessionization')
output_tbl_valid(output_table, 'Sessionization')
# ensure the expressions are not None or empty strings
_assert(partition_expr, "Sessionization error: Invalid partition expression")
_assert(time_stamp, "Sessionization error: Invalid time stamp column")
_assert(max_time, "Sessionization error: Invalid max time value")
# ensure the partition/order expression can actually be used
_assert(is_var_valid(source_table, partition_expr, time_stamp),
"Sessionization error: Invalid partition expression or time stamp column name")
def sessionize_help_message(schema_madlib, message, **kwargs):
"""
Help message for sessionize function
"""
help_string = """
------------------------------------------------------------
SUMMARY
------------------------------------------------------------
Functionality: Sessionize
The goal of the MADlib sessionize function is to perform sessionization over
a time-series based data.
------------------------------------------------------------
USAGE
------------------------------------------------------------
SELECT {schema_madlib}.sessionize(
'source_table', -- str, Name of the table
'output_table', -- str, Table name to store the Sessionization results
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, The time stamp column name that is used for sessionization calculation
'max_time' -- str, Delta time between subsequent events to define a session
'output_cols' -- str, an optional valid postgres SELECT expression for the output table/view (default *)
'create_view' -- boolean, optional parameter to specify if output is a view or materilized to a table (default True)
);
------------------------------------------------------------
EXAMPLE
------------------------------------------------------------
- Create an input data set
DROP TABLE IF EXISTS ecommerce_sessionize;
CREATE TABLE eventlog (event_timestamp TIMESTAMP,
user_id INT,
old_session_id INT,
page TEXT,
revenue FLOAT,
row INT);
COPY ecommerce_sessionize (event_timestamp, user_id, old_session_id,
page, revenue, row) FROM stdin delimiter '|';
'04/15/2015 01:03:0.5'|100821|100|'LANDING'|0|1
'04/15/2015 01:03:50'|100821|100|'WINE'|0|1
'04/15/2015 01:04:10'|100821|100|'CHECKOUT'|39|1
'04/15/2015 01:04:15'|100821|101|'WINE'|0|1
'04/15/2015 01:05:00'|100821|100|'WINE'|0|1
'04/15/2015 01:07:00'|100821|100|'CHECKOUT'|39|1
'04/15/2015 02:06:00'|100821|101|'WINE'|0|1
'04/15/2015 02:06:10'|100821|101|'WINE'|0|1
'04/15/2015 02:06:20'|100821|101|'WINE'|0|1
'04/15/2015 02:06:30'|100821|101|'WINE'|0|1
'04/15/2015 02:07:00'|100821|101|'WINE'|0|1
'04/15/2015 01:15:00'|101121|102|'LANDING'|0|1
'04/15/2015 01:16:00'|101121|102|'WINE'|0|1
'04/15/2015 01:18:00'|101121|102|'CHECKOUT'|15|1
'04/15/2015 01:19:00'|101121|102|'LANDING'|0|1
'04/15/2015 01:21:00'|101121|102|'HELP'|0|1
NULL|101121|102|'LANDING'|0|1
NULL|101121|102|'HELP'|0|1
'04/15/2015 01:24:00'|101121|102|'WINE'|0|1
'04/15/2015 01:26:00'|101121|102|'CHECKOUT'|23|1
'04/15/2015 02:21:00'|101121|102|'HELP'|0|1
'04/15/2015 02:24:00'|101121|102|'WINE'|0|1
'04/15/2015 02:26:00'|101121|102|'CHECKOUT'|23|1
'04/15/2015 02:15:00'|101331|103|'LANDING'|0|1
'04/15/2015 02:16:00'|101331|103|'WINE'|0|1
'04/15/2015 02:18:00'|101331|103|'HELP'|0|1
'04/15/2015 02:20:00'|101331|103|'WINE'|0|1
'04/15/2015 02:21:00'|101331|103|'CHECKOUT'|16|1
'04/15/2015 02:22:00'|101443|104|'BEER'|0|1
'04/15/2015 02:25:00'|101443|104|'CHECKOUT'|12|1
'04/15/2015 02:29:00'|101881|105|'LANDING'|0|1
'04/15/2015 02:30:00'|101881|105|'BEER'|0|1
'04/15/2015 01:05:00'|102201|106|'LANDING'|0|1
'04/15/2015 01:06:00'|102201|106|'HELP'|0|1
'04/15/2015 01:09:00'|102201|106|'LANDING'|0|1
'04/15/2015 02:15:00'|102201|107|'WINE'|0|1
'04/15/2015 02:16:00'|102201|107|'BEER'|0|1
'04/15/2015 02:17:00'|102201|107|'WINE'|0|1
'04/15/2015 02:18:00'|102871|108|'BEER'|0|1
'04/15/2015 02:19:00'|102871|108|'WINE'|0|1
'04/15/2015 02:22:00'|102871|108|'CHECKOUT'|21|1
'04/15/2015 02:25:00'|102871|108|'LANDING'|0|1
NULL|103711|109|'BEER'|0|1
NULL|103711|109|'LANDING'|0|1
NULL|103711|109|'WINE'|0|1
'04/15/2016 02:17:00'|103711|109|'BEER'|0|1
'04/15/2016 02:18:00'|103711|109|'LANDING'|0|1
'04/15/2016 02:19:00'|103711|109|'WINE'|0|1
\.
- Sessionize the table for each user_id, and obtain only the user_id, with partition expression,
event_timestamp and session_id:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'0:3:0' -- Events within a window of this time unit (180 seconds) must be in the same session
);
- View the output table containing the session IDs:
SELECT * FROM sessionize_output;
DROP VIEW sessionize_output;
- Sessionize the table for each user_id, and materialize all columns from source table into an output table:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'180' -- Events within a window of this time unit (180 seconds) must be in the same session
'user_id, event_timestamp' -- Preseve only user_id and event_timestamp columns, along with the session id column
'false' -- Materialize results into a table, and not a view
);
- View the output table containing the session IDs:
SELECT eventlog.*, sessionize_output.session_id FROM eventlog INNER JOIN sessionize_output ON
(eventlog.user_id=sessionize_output.user_id AND eventlog.event_timestamp=sessionize_output.event_timestamp);
DROP TABLE sessionize_output;
"""
return help_string.format(schema_madlib=schema_madlib)