From e9b1d3a7922504b6633a6a71dfd46da860c1d10f Mon Sep 17 00:00:00 2001 From: Orhan Kislal Date: Mon, 22 Aug 2016 10:24:54 -0700 Subject: [PATCH 1/2] Pivoting Adds filter for removing nulls from the aggregated results. --- .../postgres/modules/utilities/pivot.py_in | 43 ++++++++++++++----- .../modules/utilities/test/pivot.sql_in | 14 +++--- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/src/ports/postgres/modules/utilities/pivot.py_in b/src/ports/postgres/modules/utilities/pivot.py_in index fca5be0e3..3ad63c5dd 100644 --- a/src/ports/postgres/modules/utilities/pivot.py_in +++ b/src/ports/postgres/modules/utilities/pivot.py_in @@ -183,7 +183,8 @@ def pivot(schema_madlib, source_table, out_table, index, pivot_cols, # Counter for the new output column names dict_counter = 0 - pivot_out_list = [] + pivot_str_sel_list = [] + pivot_str_from_list = [] # Prepare the wrapper for fill value if fill_value is not None: fill_str_begin = " COALESCE(" @@ -234,30 +235,46 @@ def pivot(schema_madlib, source_table, out_table, index, pivot_cols, dict_counter += 1 # Collecting the whole sql query # Please refer to the earlier comment for a sample output - pivot_str = ("{fill_str_begin}" - "{agg} (CASE WHEN {pivot_begin} THEN {pval} END) " - "{fill_str_end} AS {pivot_end}". - format(fill_str_begin=fill_str_begin, + + # Create the masked set + pivot_str_from = ("(CASE WHEN {pivot_begin} THEN {pval} END) " + "AS {pivot_end}". + format(fill_str_begin=fill_str_begin, + fill_str_end=fill_str_end, + pval=pval, agg=agg, + pivot_begin=' AND '.join(pivot_str_begin), + pivot_end=''.join(pivot_str_end))) + pivot_str_from_list.append("," + pivot_str_from) + # Apply the aggregate and filtering + pivot_str_sel = ("{fill_str_begin}" + "{agg} ({pivot_end}) FILTER (" + "WHERE {pivot_end} IS NOT NULL) " + "{fill_str_end} AS {pivot_end}". + format(fill_str_begin=fill_str_begin, fill_str_end=fill_str_end, pval=pval, agg=agg, pivot_begin=' AND '.join(pivot_str_begin), pivot_end=''.join(pivot_str_end))) - pivot_out_list.append("," + pivot_str) + pivot_str_sel_list.append("," + pivot_str_sel) try: plpy.execute(""" CREATE TABLE {out_table} AS - SELECT {index} {pivot_output} - FROM {source_table} + SELECT {index} {pivot_str_sel_list} + FROM ( SELECT {index} {pivot_str_from_list} + FROM {source_table}) x GROUP BY {index} """.format(out_table=out_table, - index=index, - source_table=source_table, - pivot_output=''.join(pivot_out_list))) + index=index, + source_table=source_table, + pivot_str_from_list=''.join(pivot_str_from_list), + pivot_str_sel_list=''.join(pivot_str_sel_list))) + if output_col_dictionary: plpy.execute("INSERT INTO {out_dict} VALUES {insert_sql}". format(out_dict=out_dict, insert_sql=', '.join(insert_str))) + except plpy.SPIError: # Warn user if the number of columns is over the limit with MinWarning("warning"): @@ -269,6 +286,10 @@ def pivot(schema_madlib, source_table, out_table, index, pivot_cols, "Pivot: Too many distinct values for pivoting! " "The execution may fail due to too many columns in the " "output table.") + else: + plpy.warning( + "Pivot: Pivoting is only supported over aggregates with " + "transition functions defined as STRICT.") raise return None diff --git a/src/ports/postgres/modules/utilities/test/pivot.sql_in b/src/ports/postgres/modules/utilities/test/pivot.sql_in index 30b64066a..79bcc5759 100644 --- a/src/ports/postgres/modules/utilities/test/pivot.sql_in +++ b/src/ports/postgres/modules/utilities/test/pivot.sql_in @@ -152,15 +152,19 @@ SELECT assert(__p_7__ = 1.5, 'Wrong output in pivoting: Output dictionary') FROM pivout WHERE id = 0 AND id2 = 0; -DROP AGGREGATE IF EXISTS array_accum (anyelement); -CREATE AGGREGATE array_accum (anyelement) -( - sfunc = array_append, +DROP FUNCTION IF EXISTS array_add1(ANYARRAY, ANYELEMENT); +DROP AGGREGATE IF EXISTS array_accum1 (anyelement); +CREATE FUNCTION array_add1(ANYARRAY, ANYELEMENT) RETURNS ANYARRAY AS $$ + SELECT $1 || $2 +$$ LANGUAGE sql STRICT; + +CREATE AGGREGATE array_accum1 (anyelement) ( + sfunc = array_add1, stype = anyarray, initcond = '{}' ); DROP TABLE IF EXISTS pivout; -SELECT pivot('pivset_ext', 'pivout', 'id', 'piv', 'val', 'array_accum'); +SELECT pivot('pivset_ext', 'pivout', 'id', 'piv', 'val', 'array_accum1'); SELECT * FROM pivout; DROP VIEW IF EXISTS pivset_ext; From 0e36a83439eebe08bdd3fbfe1c4febf269093f6e Mon Sep 17 00:00:00 2001 From: Orhan Kislal Date: Tue, 23 Aug 2016 09:36:47 -0700 Subject: [PATCH 2/2] Pivoting: Removes the redundant code --- .../postgres/modules/utilities/pivot.py_in | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/ports/postgres/modules/utilities/pivot.py_in b/src/ports/postgres/modules/utilities/pivot.py_in index 3ad63c5dd..3d99bf1d2 100644 --- a/src/ports/postgres/modules/utilities/pivot.py_in +++ b/src/ports/postgres/modules/utilities/pivot.py_in @@ -204,8 +204,7 @@ def pivot(schema_madlib, source_table, out_table, index, pivot_cols, # Prepare the entry for the dictionary insert_str.append("(\'__p_{dict_counter}__\', \'{pval}\', " "\'{agg}\' ".format(dict_counter=dict_counter, - pval=pval, - agg=agg)) + pval=pval, agg=agg)) # For every pivot column in a given combination for counter, pcol in enumerate(pcols): @@ -239,36 +238,32 @@ def pivot(schema_madlib, source_table, out_table, index, pivot_cols, # Create the masked set pivot_str_from = ("(CASE WHEN {pivot_begin} THEN {pval} END) " "AS {pivot_end}". - format(fill_str_begin=fill_str_begin, - fill_str_end=fill_str_end, - pval=pval, agg=agg, + format(pval=pval, pivot_begin=' AND '.join(pivot_str_begin), pivot_end=''.join(pivot_str_end))) - pivot_str_from_list.append("," + pivot_str_from) + pivot_str_from_list.append(pivot_str_from) # Apply the aggregate and filtering pivot_str_sel = ("{fill_str_begin}" "{agg} ({pivot_end}) FILTER (" "WHERE {pivot_end} IS NOT NULL) " "{fill_str_end} AS {pivot_end}". - format(fill_str_begin=fill_str_begin, + format(agg=agg, fill_str_begin=fill_str_begin, fill_str_end=fill_str_end, - pval=pval, agg=agg, - pivot_begin=' AND '.join(pivot_str_begin), pivot_end=''.join(pivot_str_end))) - pivot_str_sel_list.append("," + pivot_str_sel) + pivot_str_sel_list.append(pivot_str_sel) try: plpy.execute(""" CREATE TABLE {out_table} AS - SELECT {index} {pivot_str_sel_list} - FROM ( SELECT {index} {pivot_str_from_list} + SELECT {index}, {pivot_str_sel_list} + FROM ( SELECT {index}, {pivot_str_from_list} FROM {source_table}) x GROUP BY {index} """.format(out_table=out_table, index=index, source_table=source_table, - pivot_str_from_list=''.join(pivot_str_from_list), - pivot_str_sel_list=''.join(pivot_str_sel_list))) + pivot_str_from_list=', '.join(pivot_str_from_list), + pivot_str_sel_list=', '.join(pivot_str_sel_list))) if output_col_dictionary: plpy.execute("INSERT INTO {out_dict} VALUES {insert_sql}".