Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagerank: Remove duplicate entries from grouping output #294

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/ports/postgres/modules/graph/graph_utils.py_in
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@ def validate_params_for_link_analysis(schema_madlib, func_name,
format(func_name))

def update_output_grouping_tables_for_link_analysis(temp_summary_table,
iter_num, summary_table,
out_table, res_table,
grouping_cols_list,
cur_unconv,
message_unconv=None):
iter_num,
summary_table,
out_table,
res_table,
grouping_cols_list,
cur_unconv,
message_unconv=None):
"""
This function updates the summary and output tables only for those
groups that have converged. This is found out by looking at groups
Expand Down
71 changes: 47 additions & 24 deletions src/ports/postgres/modules/graph/pagerank.py_in
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
WHERE __t1__.{src}=__t2__.{dest}
AND {where_group_clause}
) {vpg_where_clause_ins}
GROUP BY {select_group_cols}, {vertex_id}, pagerank
""".format(
select_group_cols=get_table_qualified_col_str(
'__t1__', grouping_cols_list),
Expand Down Expand Up @@ -526,6 +527,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
# This is used only when grouping is set. This essentially will have
# the condition that will help skip the PageRank computation on groups
# that have converged.

plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr}
Expand All @@ -543,6 +545,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
""".format(ignore_group_clause=ignore_group_clause_pr
if iteration_num > 0 else ignore_group_clause_first,
**locals()))

# If there are nodes that have no incoming edges, they are not
# captured in the message table. Insert entries for such nodes,
# with random_prob.
Expand All @@ -564,7 +567,19 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
# If no grouping columns are specified, then we check if there is
# at least one unconverged node (limit 1 is used in the
# query).
plpy.execute("""
if iteration_num == 0 and grouping_cols:
# Hack to address corner case:
# With grouping, if there was a graph that converged in
# the very first iteration (a complete graph is an eg.
# of such a graph), then the pagerank scores for that
# group was not showing up in the output. The following
# code just prevents convergence in the first iteration.
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT * FROM {distinct_grp_table}
""".format(**locals()))
else:
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT {grouping_cols_select_conv}
FROM {message}
Expand All @@ -578,19 +593,23 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num > 0 else ignore_group_clause_conv,
**locals()))

unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
""".format(message_unconv))[0]["cnt"]
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table, message,
grouping_cols_list,
cur_unconv,
message_unconv)
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
message,
grouping_cols_list,
cur_unconv,
message_unconv)

plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
Expand All @@ -613,21 +632,24 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
cur_unconv)
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
cur,
grouping_cols_list,
cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
distinct_grp_table)
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
distinct_grp_table)

# updating the calculated pagerank value in case of
# Personalized Page Rank.
Expand All @@ -639,15 +661,16 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_
""".format(out_table=out_table,
total_ppr_nodes=total_ppr_nodes))
else:
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {table_name} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(table_name=cur,
total_ppr_nodes=total_ppr_nodes))

plpy.execute("""
ALTER TABLE {table_name}
RENAME TO {out_table}
Expand Down
40 changes: 40 additions & 0 deletions src/ports/postgres/modules/graph/test/pagerank.sql_in
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,43 @@ select assert(array_agg(user_id order by pagerank desc)= '{2, 2, 1, 1, 2, 2, 1,
-- SELECT assert(relative_error(__iterations__, 31) = 0,
-- 'PageRank: Incorrect iterations for group 2.'
-- ) FROM pagerank_gr_out_summary WHERE user_id=2;

-- Test to capture corner case reported in https://issues.apache.org/jira/browse/MADLIB-1229

DROP TABLE IF EXISTS vertex, "EDGE";
CREATE TABLE vertex(
id INTEGER
);
CREATE TABLE "EDGE"(
src INTEGER,
dest INTEGER,
user_id INTEGER
);
INSERT INTO vertex VALUES
(0),
(1),
(2);
INSERT INTO "EDGE" VALUES
(0, 1, 1),
(0, 2, 1),
(1, 2, 1),
(2, 1, 1),
(0, 1, 2);


DROP TABLE IF EXISTS pagerank_gr_out;
DROP TABLE IF EXISTS pagerank_gr_out_summary;
SELECT pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'"EDGE"', -- "EDGE" table
'src=src, dest=dest', -- "EDGE" args
'pagerank_gr_out', -- Output table of PageRank
NULL, -- Default damping factor (0.85)
NULL, -- Default max iters (100)
NULL, -- Default Threshold
'user_id');

SELECT assert(relative_error(SUM(pagerank), 1) < 0.00001,
'PageRank: Scores do not sum up to 1 for group 1.'
) FROM pagerank_gr_out WHERE user_id=1;