From 1b55acac9d5550e0a74fa46ec0ab4842d089ac1c Mon Sep 17 00:00:00 2001 From: Orhan Kislal Date: Fri, 13 Jul 2018 17:09:11 -0700 Subject: [PATCH 1/2] Pagerank: Remove duplicate entries from grouping output JIRA: MADLIB-1229 JIRA: MADLIB-1253 Fixes the missing output for complete graphs bug as well. Co-authored-by: Nandish Jayaram --- .../postgres/modules/graph/graph_utils.py_in | 12 ++-- .../postgres/modules/graph/pagerank.py_in | 71 ++++++++++++------- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in index 8a41560f1..b0eaee427 100644 --- a/src/ports/postgres/modules/graph/graph_utils.py_in +++ b/src/ports/postgres/modules/graph/graph_utils.py_in @@ -128,11 +128,13 @@ def validate_params_for_link_analysis(schema_madlib, func_name, format(func_name)) def update_output_grouping_tables_for_link_analysis(temp_summary_table, - iter_num, summary_table, - out_table, res_table, - grouping_cols_list, - cur_unconv, - message_unconv=None): + iter_num, + summary_table, + out_table, + res_table, + grouping_cols_list, + cur_unconv, + message_unconv=None): """ This function updates the summary and output tables only for those groups that have converged. This is found out by looking at groups diff --git a/src/ports/postgres/modules/graph/pagerank.py_in b/src/ports/postgres/modules/graph/pagerank.py_in index 31377c08b..71cddd24e 100644 --- a/src/ports/postgres/modules/graph/pagerank.py_in +++ b/src/ports/postgres/modules/graph/pagerank.py_in @@ -455,6 +455,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause} ) {vpg_where_clause_ins} + GROUP BY {select_group_cols}, {vertex_id}, pagerank """.format( select_group_cols=get_table_qualified_col_str( '__t1__', grouping_cols_list), @@ -526,6 +527,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ # This is used only when grouping is set. This essentially will have # the condition that will help skip the PageRank computation on groups # that have converged. + plpy.execute(""" CREATE TABLE {message} AS SELECT {grouping_cols_select_pr} @@ -543,6 +545,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ """.format(ignore_group_clause=ignore_group_clause_pr if iteration_num > 0 else ignore_group_clause_first, **locals())) + # If there are nodes that have no incoming edges, they are not # captured in the message table. Insert entries for such nodes, # with random_prob. @@ -564,7 +567,19 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ # If no grouping columns are specified, then we check if there is # at least one unconverged node (limit 1 is used in the # query). - plpy.execute(""" + if iteration_num == 0 and grouping_cols: + # Hack to address corner case: + # With grouping, if there was a graph that converged in + # the very first iteration (a complete graph is an eg. + # of such a graph), then the pagerank scores for that + # group was not showing up in the output. The following + # code just prevents convergence in the first iteration. + plpy.execute(""" + CREATE TEMP TABLE {message_unconv} AS + SELECT * FROM {distinct_grp_table} + """.format(**locals())) + else: + plpy.execute(""" CREATE TEMP TABLE {message_unconv} AS SELECT {grouping_cols_select_conv} FROM {message} @@ -578,19 +593,23 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ """.format(ignore_group_clause=ignore_group_clause_ins if iteration_num > 0 else ignore_group_clause_conv, **locals())) + unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0} """.format(message_unconv))[0]["cnt"] if iteration_num > 0 and grouping_cols: # Update result and summary tables for groups that have # converged # since the last iteration. - update_output_grouping_tables_for_link_analysis(temp_summary_table, - iteration_num, - summary_table, - out_table, message, - grouping_cols_list, - cur_unconv, - message_unconv) + update_output_grouping_tables_for_link_analysis( + temp_summary_table, + iteration_num, + summary_table, + out_table, + message, + grouping_cols_list, + cur_unconv, + message_unconv) + plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv)) plpy.execute("""ALTER TABLE {message_unconv} RENAME TO {cur_unconv} """.format(**locals())) @@ -613,21 +632,24 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ # We completed max_iters, but there are still some unconverged # groups # Update the result and summary tables for unconverged # groups. - update_output_grouping_tables_for_link_analysis(temp_summary_table, - iteration_num, - summary_table, - out_table, cur, - grouping_cols_list, - cur_unconv) + update_output_grouping_tables_for_link_analysis( + temp_summary_table, + iteration_num, + summary_table, + out_table, + cur, + grouping_cols_list, + cur_unconv) else: # No group has converged. List of all group values are in # distinct_grp_table. - update_output_grouping_tables_for_link_analysis(temp_summary_table, - iteration_num, - summary_table, - out_table, cur, - grouping_cols_list, - distinct_grp_table) + update_output_grouping_tables_for_link_analysis( + temp_summary_table, + iteration_num, + summary_table, + out_table, cur, + grouping_cols_list, + distinct_grp_table) # updating the calculated pagerank value in case of # Personalized Page Rank. @@ -639,15 +661,16 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_ """.format(out_table=out_table, total_ppr_nodes=total_ppr_nodes)) else: - # updating the calculated pagerank value in case of - # Personalized Page Rank. - # Ref : - # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html + # updating the calculated pagerank value in case of + # Personalized Page Rank. + # Ref : + # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html if total_ppr_nodes > 1: plpy.execute("""UPDATE {table_name} set pagerank = pagerank / {total_ppr_nodes}::DOUBLE PRECISION """.format(table_name=cur, total_ppr_nodes=total_ppr_nodes)) + plpy.execute(""" ALTER TABLE {table_name} RENAME TO {out_table} From 16d4deb2424cc72b8758430e37e17ffa396e655a Mon Sep 17 00:00:00 2001 From: Nandish Jayaram Date: Mon, 16 Jul 2018 16:33:04 -0700 Subject: [PATCH 2/2] add new test case in dev-check --- .../modules/graph/test/pagerank.sql_in | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/ports/postgres/modules/graph/test/pagerank.sql_in b/src/ports/postgres/modules/graph/test/pagerank.sql_in index 4b93075b9..14d337128 100644 --- a/src/ports/postgres/modules/graph/test/pagerank.sql_in +++ b/src/ports/postgres/modules/graph/test/pagerank.sql_in @@ -149,3 +149,43 @@ select assert(array_agg(user_id order by pagerank desc)= '{2, 2, 1, 1, 2, 2, 1, -- SELECT assert(relative_error(__iterations__, 31) = 0, -- 'PageRank: Incorrect iterations for group 2.' -- ) FROM pagerank_gr_out_summary WHERE user_id=2; + +-- Test to capture corner case reported in https://issues.apache.org/jira/browse/MADLIB-1229 + +DROP TABLE IF EXISTS vertex, "EDGE"; +CREATE TABLE vertex( +id INTEGER +); +CREATE TABLE "EDGE"( +src INTEGER, +dest INTEGER, +user_id INTEGER +); +INSERT INTO vertex VALUES +(0), +(1), +(2); +INSERT INTO "EDGE" VALUES +(0, 1, 1), +(0, 2, 1), +(1, 2, 1), +(2, 1, 1), +(0, 1, 2); + + +DROP TABLE IF EXISTS pagerank_gr_out; +DROP TABLE IF EXISTS pagerank_gr_out_summary; +SELECT pagerank( +'vertex', -- Vertex table +'id', -- Vertix id column +'"EDGE"', -- "EDGE" table +'src=src, dest=dest', -- "EDGE" args +'pagerank_gr_out', -- Output table of PageRank +NULL, -- Default damping factor (0.85) +NULL, -- Default max iters (100) +NULL, -- Default Threshold +'user_id'); + +SELECT assert(relative_error(SUM(pagerank), 1) < 0.00001, + 'PageRank: Scores do not sum up to 1 for group 1.' + ) FROM pagerank_gr_out WHERE user_id=1;