From 298aeae2ab31bd59edac5e7784c53816844988ab Mon Sep 17 00:00:00 2001 From: Orhan Kislal Date: Mon, 5 Dec 2016 09:38:42 -0800 Subject: [PATCH 1/5] Graph: SSSP JIRA: MADLIB-992 - Introduces a new module: Graph. - Implements the single source shortest path algorithm (Bellman-Ford). --- doc/mainpage.dox.in | 4 + src/config/Modules.yml | 1 + .../postgres/modules/graph/__init__.py_in | 0 src/ports/postgres/modules/graph/sssp.py_in | 345 ++++++++++++++++++ src/ports/postgres/modules/graph/sssp.sql_in | 255 +++++++++++++ .../postgres/modules/graph/test/sssp.sql_in | 78 ++++ .../postgres/modules/utilities/pivot.sql_in | 84 ++--- 7 files changed, 725 insertions(+), 42 deletions(-) create mode 100644 src/ports/postgres/modules/graph/__init__.py_in create mode 100644 src/ports/postgres/modules/graph/sssp.py_in create mode 100644 src/ports/postgres/modules/graph/sssp.sql_in create mode 100644 src/ports/postgres/modules/graph/test/sssp.sql_in diff --git a/doc/mainpage.dox.in b/doc/mainpage.dox.in index 00ef5465b..31c7ba954 100644 --- a/doc/mainpage.dox.in +++ b/doc/mainpage.dox.in @@ -119,6 +119,10 @@ complete matrix stored as a distributed table. @defgroup grp_stemmer Stemming @ingroup grp_datatrans +@defgroup grp_graph Graph +@{Contains graph algorithms. @} + @defgroup grp_sssp Single Source Shortest Path + @ingroup grp_graph @defgroup grp_mdl Model Evaluation @{Contains functions for evaluating accuracy and validation of predictive methods. @} diff --git a/src/config/Modules.yml b/src/config/Modules.yml index fd3c5e63b..c3315b6ac 100644 --- a/src/config/Modules.yml +++ b/src/config/Modules.yml @@ -14,6 +14,7 @@ modules: - name: elastic_net - name: glm depends: ['utilities'] + - name: graph - name: kmeans depends: ['array_ops', 'svec_util', 'sample'] - name: lda diff --git a/src/ports/postgres/modules/graph/__init__.py_in b/src/ports/postgres/modules/graph/__init__.py_in new file mode 100644 index 000000000..e69de29bb diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in new file mode 100644 index 000000000..5d5ee786c --- /dev/null +++ b/src/ports/postgres/modules/graph/sssp.py_in @@ -0,0 +1,345 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Single Source Shortest Path + +# Please refer to the sssp.sql_in file for the documentation + +""" +@file sssp.py_in + +@namespace graph +""" + +import plpy +from utilities.control import MinWarning +from utilities.utilities import _assert +from utilities.utilities import extract_keyvalue_params +from utilities.utilities import unique_string +from utilities.validate_args import get_cols +from utilities.validate_args import unquote_ident +from utilities.validate_args import table_exists +from utilities.validate_args import columns_exist_in_table +from utilities.validate_args import table_is_empty + +m4_changequote(`') + +def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table, + edge_args, source_vertex, out_table, **kwargs): + """ + Single source shortest path function for graphs + Args: + @param vertex_table Name of the table that contains the vertex data. + @param vertex_id Name of the column containing the vertex ids. + @param edge_table Name of the table that contains the edge data. + @param edge_args A comma-delimited string containing multiple + named arguments of the form "name=value". + @param source_vertex The source vertex id for the algorithm to start. + @param out_table Name of the table to store the result of SSSP. + """ + + with MinWarning("warning"): + + INT_MAX = 2147483647 + + message = unique_string(desp='message') + toupdate = unique_string(desp='toupdate') + + params_types = {'src': str, 'dest': str, 'weight': str} + default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} + edge_params = extract_keyvalue_params(edge_args, + params_types, + default_args) + if vertex_id is None: + vertex_id = "id" + + src = edge_params["src"] + dest = edge_params["dest"] + weight = edge_params["weight"] + + distribution = m4_ifdef(, , + ) + local_distribution = m4_ifdef(, , + ) + + validate_graph_coding(vertex_table, vertex_id, edge_table, + edge_params, source_vertex, out_table) + + plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(message,toupdate)) + + plpy.execute( + """ CREATE TABLE {out_table} AS + SELECT {vertex_id}::INT AS {vertex_id}, + CAST({INT_MAX} AS INT) AS {weight}, + CAST({INT_MAX} AS INT) AS parent + FROM {vertex_table} {distribution} """.format(**locals())) + + plpy.execute( + """ CREATE TEMP TABLE {message}( + id INT, val INT, parent INT) + {local_distribution} """.format(**locals())) + plpy.execute( + """ CREATE TEMP TABLE {toupdate}( + id INT, val INT, parent INT) + {local_distribution} """.format(**locals())) + temp_table = unique_string(desp='temp') + sql = m4_ifdef(, + """ CREATE TABLE {temp_table} ( + {vertex_id} INT, {weight} INT, parent INT) {distribution}; + """, ) + plpy.execute(sql.format(**locals())) + + plpy.execute( + """ INSERT INTO {message} VALUES({source_vertex},0,{source_vertex}) + """.format(**locals())) + + v_cnt = plpy.execute("SELECT count(*) FROM {vertex_table}". + format(**locals())) + for i in range(0,v_cnt[0]['count']): + + plpy.execute("TRUNCATE TABLE {0}".format(toupdate)) + plpy.execute( + """ INSERT INTO {toupdate} + SELECT DISTINCT ON (message.id) message.id AS id, + message.val AS val, + message.parent AS parent + FROM {message} AS message, {out_table} AS out_table + WHERE message.id = out_table.{vertex_id} + AND message.val, + , + ) + plpy.execute(sql.format(**locals())) + plpy.execute("TRUNCATE TABLE {0}".format(message)) + plpy.execute( + """ INSERT INTO {message} + SELECT edge_table.{dest} AS id, x.val AS val, + toupdate.id AS parent + FROM {toupdate} AS toupdate, {edge_table} AS edge_table, ( + SELECT edge_table.{dest} AS id, + min(toupdate.val + edge_table.{weight}) AS val + FROM {toupdate} AS toupdate, + {edge_table} AS edge_table + WHERE edge_table.{src}=toupdate.id + GROUP BY edge_table.{dest}) x + WHERE edge_table.{src} = toupdate.id + AND edge_table.{dest} = x.id + AND toupdate.val + edge_table.{weight} = x.val + """.format(**locals())) + + ncycle = plpy.execute( + """ SELECT out_table1.{vertex_id} AS v1, + out_table2.{vertex_id} AS v2 + FROM {out_table} AS out_table1 , {out_table} AS out_table2, + {edge_table} AS edge_table + WHERE out_table1.{vertex_id} = edge_table.{src} AND + out_table2.{vertex_id} = edge_table.{dest} AND + out_table1.{weight} + edge_table.{weight} < + out_table2.{weight} + LIMIT 1 + """.format(**locals())) + if ncycle: + plpy.error("Graph SSSP: Detected a negative cycle in the graph.") + + sql = m4_ifdef(, + """ DROP TABLE {temp_table} """, ) + plpy.execute(sql.format(**locals())) + + return None + +def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs): + """ + Helper function that can be used to get the shortest path for a vertex + Args: + @param source_table Name of the table that contains the SSSP output. + @param out_table The vertex that will be the destination of the + desired path. + """ + validate_get_path(sssp_table, dest_vertex) + cur = dest_vertex + cols = get_cols(sssp_table) + id = cols[0] + ret = [dest_vertex] + sql = "SELECT parent FROM {sssp_table} WHERE {id} = {cur} LIMIT 1" + parent = plpy.execute(sql.format(**locals())) + if parent is None: + plpy.error( + "Graph SSSP: Vertex {0} is not present in the sssp table {1}". + format(dest_vertex,sssp_table)) + + while 1: + parent = parent[0]['parent'] + if parent == cur: + ret.reverse() + return ret + else: + ret.append(parent) + cur = parent + + parent = plpy.execute(sql.format(**locals())) + + return None + +def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + source_vertex, out_table, **kwargs): + + _assert(out_table and out_table.strip().lower() not in ('null', ''), + "Graph SSSP: Invalid output table name!") + _assert(not table_exists(out_table), + "Graph SSSP: Output table already exists!") + + _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''), + "Graph SSSP: Invalid vertex table name!") + _assert(table_exists(vertex_table), + "Graph SSSP: Vertex table ({0}) is missing!".format(vertex_table)) + _assert(not table_is_empty(vertex_table), + "Graph SSSP: Vertex table ({0}) is empty!".format(vertex_table)) + + _assert(edge_table and edge_table.strip().lower() not in ('null', ''), + "Graph SSSP: Invalid vertex table name!") + _assert(table_exists(edge_table), + "Graph SSSP: Edge table ({0}) is missing!".format(edge_table)) + _assert(not table_is_empty(edge_table), + "Graph SSSP: Edge table ({0}) is empty!".format(edge_table)) + + existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table)) + _assert(vertex_id in existing_cols, """Graph SSSP: The {vertex_id} + column is not present in vertex table ({vertex_table}) + """.format(**locals())) + _assert(columns_exist_in_table(edge_table, edge_params.values()), + "Graph SSSP: Not all columns from {0} present in edge table ({1})". + format(edge_params.values(), edge_table)) + + return None + +def validate_get_path(sssp_table, dest_vertex, **kwargs): + + _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''), + "Graph SSSP: Invalid SSSP table name!") + _assert(table_exists(sssp_table), + "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table)) + _assert(not table_is_empty(sssp_table), + "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table)) + + +def graph_sssp_help(schema_madlib, message, **kwargs): + """ + Help function for graph_sssp and graph_sssp_get_path + + Args: + @param schema_madlib + @param message: string, Help message string + @param kwargs + + Returns: + String. Help/usage information + """ + if not message: + help_string = """ +----------------------------------------------------------------------- + SUMMARY +----------------------------------------------------------------------- + +Given a graph and a source vertex, single source shortest path (SSSP) +algorithm finds a path for every vertex such that the the sum of the +weights of its constituent edges is minimized. + +For more details on function usage: + SELECT {schema_madlib}.graph_sssp('usage') + """ + elif message in ['usage', 'help', '?']: + help_string = """ +---------------------------------------------------------------------------- + USAGE +---------------------------------------------------------------------------- + SELECT {schema_madlib}.graph_sssp( + vertex_table TEXT, -- Name of the table that contains the vertex data. + vertex_id TEXT, -- Name of the column containing the vertex ids. + edge_table TEXT, -- Name of the table that contains the edge data. + edge_args TEXT, -- A comma-delimited string containing multiple + -- named arguments of the form "name=value". + source_vertex INT, -- The source vertex id for the algorithm to start. + out_table TEXT -- Name of the table to store the result of SSSP. +); + +The following parameters are supported for edge table arguments ('edge_args' + above): + +src (default = 'src') : Name of the column containing the source + vertex ids in the edge table. +dest (default = 'dest') : Name of the column containing the destination + vertex ids in the edge table. +weight (default = 'weight') : Name of the column containing the weight of + edges in the edge table. + +To retrieve the path for a specific vertex: + + SELECT {schema_madlib}.graph_sssp_get_path( + sssp_table TEXT, -- Name of the table that contains the SSSP output. + dest_vertex INT -- The vertex that will be the destination of the + -- desired path. +); + +---------------------------------------------------------------------------- + OUTPUT +---------------------------------------------------------------------------- +The output table ('out_table' above) will contain a row for every vertex from +vertex_table and have the following columns: + +vertex_id : The id for the destination. Will use the input parameter + (vertex_id) for column naming. +weight : The total weight of the shortest path from the source vertex + to this particular vertex. Will use the input parameter (weight) + for column naming. +parent : The parent of this vertex in the shortest path from source. + Will use "parent" for column naming. + +The graph_sssp_get_path function will return an INT array that contains the +shortest path from the initial source vertex to the desired destination vertex. +""" + else: + help_string = "No such option. Use {schema_madlib}.graph_sssp()" + + return help_string.format(schema_madlib=schema_madlib) +# --------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/sssp.sql_in b/src/ports/postgres/modules/graph/sssp.sql_in new file mode 100644 index 000000000..6b82f6dcb --- /dev/null +++ b/src/ports/postgres/modules/graph/sssp.sql_in @@ -0,0 +1,255 @@ +/* ----------------------------------------------------------------------- *//** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + * @file graph.sql_in + * + * @brief SQL functions for graph analytics + * @date Nov 2016 + * + * @sa Provides various graph algorithms. + * + *//* ----------------------------------------------------------------------- */ +m4_include(`SQLCommon.m4') + + +/** +@addtogroup grp_sssp + +
Contents + +
+ +@brief Finds the shortests path from a single source vertex to every other vertex in a given graph. + +Given a graph and a source vertex, single source shortest path (SSSP) algorithm finds a path for every vertex such that the the sum of the weights of its constituent edges is minimized. + +@anchor sssp +@par SSSP +
+graph_sssp(
+    vertex_table            TEXT,
+    vertex_id               TEXT,
+    edge_table              TEXT,
+    edge_args               TEXT,
+    source_vertex           INT,
+    out_table               TEXT
+)
+
+ +\b Arguments +
+
vertex_table
+
TEXT. Name of the table that contains the vertex data. Has to contain the column specified in the vertex_id parameter.
+ +
vertex_id
+
TEXT, default = 'id'. Name of the column containing the vertex ids in the vertex table.
+ +
edge_table
+
TEXT. Name of the table that contains the edge data. Has to contain the columns specified in the edge_args parameter.
+ +
edge_args
+
TEXT. A comma-delimited string containing multiple named arguments of the form "name=value". The following parameters are supported for this string argument: + - src (default = 'src'): Name of the column containing the source vertex ids in the edge table. + - dest (default = 'dest'): Name of the column containing the destination vertex ids in the edge table. + - weight (default = 'weight'): Name of the column containing the weight of edges in the edge table.
+ +
source_vertex
+
TEXT. The source vertex id for the algorithm to start. This vertex id has to exist in the vertex_id column of vertex_table.
+ +
out_table
+
TEXT. Name of the table to store the result of SSSP. It will contain a row for every vertex from vertex_table and have the following columns: + - vertex_id : The id for the destination. Will use the input parameter (vertex_id) for column naming. + - weight : The total weight of the shortest path from the source vertex to this particular vertex. Will use the input parameter (weight) for column naming. + - parent : The parent of this vertex in the shortest path from source. Will use "parent" for column naming.
+
+ +@par Path Retrieval + +
+graph_sssp(
+    sssp_table            TEXT,
+    dest_vertex           INT
+)
+
+ +\b Arguments +
+
sssp_table
+
TEXT. Name of the table that contains the SSSP output.
+ +
dest_vertex
+
INT. The vertex that will be the destination of the desired path.
+
+ +@anchor notes +@par Notes + +The Bellman-Ford algorithm [1] is used for implementing SSSP. This implementation allows negative edges as long as there are no negative cycles. In the case of graphs with negative cycles, the function will give an error but still provide the output table in case the user wants to analyze it further. + +The implementation is inspired by the GRAIL project [2]. + +@anchor examples +@examp + +- Create a vertex and an edge table to represent the graph. +
+DROP TABLE IF EXISTS vertex,edge,out;
+CREATE TABLE vertex(
+	id INTEGER
+);
+CREATE TABLE edge(
+	src INTEGER,
+	dest INTEGER,
+	weight INTEGER
+);
+INSERT INTO vertex VALUES
+(0),
+(1),
+(2),
+(3),
+(4),
+(5),
+(6),
+(7)
+;
+INSERT INTO edge VALUES
+(0, 1, 1),
+(0, 2, 1),
+(0, 4, 10),
+(1, 2, 2),
+(1, 3, 10),
+(2, 3, 1),
+(2, 5, 1),
+(2, 6, 3),
+(3, 0, 1),
+(4, 0, -2),
+(5, 6, 1),
+(6, 7, 1)
+;
+
+ +- Calculate the shortest paths from vertex 0 +
+SELECT graph_sssp('vertex',NULL,'edge',NULL,0,'out');
+SELECT * FROM out;
+
+
+ id | weight | parent
+----+--------+--------
+  0 |      0 |      0
+  1 |      1 |      0
+  2 |      1 |      0
+  4 |     10 |      0
+  3 |      2 |      2
+  5 |      2 |      2
+  6 |      3 |      5
+  7 |      4 |      6
+
+ +- Get the shortest path to vertex 6 +
+SELECT graph_sssp_get_path('out',6) AS spath;
+
+
+   spath
+\-----------
+ {0,2,5,6}
+
+ +- Use different column names in the tables +
+DROP TABLE IF EXISTS vertex_alt,edge_alt,out_alt;
+CREATE TABLE vertex_alt AS SELECT id AS v_id FROM vertex;
+CREATE TABLE edge_alt AS SELECT src AS e_src, dest, weight AS e_weight FROM edge;
+
+ +- Get the shortest path from vertex 1 +
+SELECT graph_sssp('vertex_alt','v_id','edge_alt','src=e_src, weight=e_weight',1,'out_alt');
+SELECT * FROM out_alt;
+
+
+ v_id | e_weight | parent
+------+----------+--------
+    1 |        0 |      1
+    2 |        2 |      1
+    3 |        3 |      2
+    5 |        3 |      2
+    0 |        4 |      3
+    6 |        4 |      5
+    4 |       14 |      0
+    7 |        5 |      6
+
+ +@anchor literature +@par Literature + +[1] Bellman–Ford algorithm. https://en.wikipedia.org/wiki/Bellman%E2%80%93Ford_algorithm. + +[2] The case against specialized graph analytics engines, J. Fan, G. Soosai Raj, and J. M. Patel. CIDR 2015. http://pages.cs.wisc.edu/~jignesh/publ/Grail.pdf +*/ + +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + source_vertex INT, + out_table TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, sssp, graph_sssp) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp_get_path( + sssp_table TEXT, + dest_vertex INT + +) RETURNS INT[] AS $$ + PythonFunction(graph, sssp, graph_sssp_get_path) +$$ LANGUAGE plpythonu IMMUTABLE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `'); +------------------------------------------------------------------------- + +-- Online help +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp( + message VARCHAR +) RETURNS VARCHAR AS $$ + PythonFunction(graph, sssp, graph_sssp_help) +$$ LANGUAGE plpythonu IMMUTABLE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `'); + +-------------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp() +RETURNS VARCHAR AS $$ + SELECT MADLIB_SCHEMA.graph_sssp(''); +$$ LANGUAGE sql IMMUTABLE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `'); +-------------------------------------------------------------------------------- + diff --git a/src/ports/postgres/modules/graph/test/sssp.sql_in b/src/ports/postgres/modules/graph/test/sssp.sql_in new file mode 100644 index 000000000..e2342c59d --- /dev/null +++ b/src/ports/postgres/modules/graph/test/sssp.sql_in @@ -0,0 +1,78 @@ +/* ----------------------------------------------------------------------- *//** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + *//* ----------------------------------------------------------------------- */ + + +DROP TABLE IF EXISTS vertex,edge,out,vertex_alt,edge_alt,out_alt; + + +CREATE TABLE vertex( + id INTEGER + ); + +CREATE TABLE edge( + src INTEGER, + dest INTEGER, + weight INTEGER + ); + +INSERT INTO vertex VALUES +(0), +(1), +(2), +(3), +(4), +(5), +(6), +(7) +; +INSERT INTO edge VALUES +(0, 1, 1), +(0, 2, 1), +(0, 4, 10), +(1, 2, 2), +(1, 3, 10), +(2, 3, 1), +(2, 5, 1), +(2, 6, 3), +(3, 0, 1), +(4, 0, -2), +(5, 6, 1), +(6, 7, 1) +; + +SELECT graph_sssp('vertex',NULL,'edge',NULL,0,'out'); + +SELECT * FROM out; + +SELECT assert(weight = 3, 'Wrong output in graph (SSSP)') FROM out WHERE id = 6; +SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)') FROM out WHERE id = 6; + +SELECT graph_sssp_get_path('out',6); + +CREATE TABLE vertex_alt AS SELECT id AS v_id FROM vertex; +CREATE TABLE edge_alt AS SELECT src AS e_src, dest, weight AS e_weight FROM edge; + +SELECT graph_sssp('vertex_alt','v_id','edge_alt','src=e_src, weight=e_weight',1,'out_alt'); + +SELECT * FROM out_alt; + +SELECT assert(e_weight = 4, 'Wrong output in graph (SSSP)') FROM out_alt WHERE v_id = 6; +SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)') FROM out_alt WHERE v_id = 6; diff --git a/src/ports/postgres/modules/utilities/pivot.sql_in b/src/ports/postgres/modules/utilities/pivot.sql_in index 2cadab038..1c36cef86 100644 --- a/src/ports/postgres/modules/utilities/pivot.sql_in +++ b/src/ports/postgres/modules/utilities/pivot.sql_in @@ -21,7 +21,7 @@ * @file pivot.sql_in * * @brief SQL functions for pivoting - * @date June 2014 + * @date June 2016 * * @sa Creates a pivot table for data summarization. * @@ -34,7 +34,7 @@ m4_include(`SQLCommon.m4')
Contents
    -
  • Pivoting
  • +
  • Pivoting
  • Notes
  • Examples
  • Literature
  • @@ -43,7 +43,7 @@ m4_include(`SQLCommon.m4') @brief Provides pivoting functions helpful for data preparation before modeling -@anchor categorical +@anchor pivoting The goal of the MADlib pivot function is to provide a data summarization tool that can do basic OLAP type operations on data stored in one table and output the summarized data to a second table. @@ -70,22 +70,22 @@ pivot(
    output_table
    VARCHAR. Name of output table that contains the pivoted data. The output table contains all the columns present in - the 'index' column list, plus additional columns for each - distinct value in 'pivot_cols'. + the 'index' column list, plus additional columns for each + distinct value in 'pivot_cols'. @note The names of the columns in the output table are auto-generated. Please see the examples section below to see how this works in practice. - The convention used is to concatenate the following strings and separate + The convention used is to concatenate the following strings and separate each by an underscore '_' : - name of the value column 'pivot_values' - aggregate function - name of the pivot column 'pivot_cols' - values in the pivot column - +
    index
    VARCHAR. Comma-separated columns that will form the index of the output - pivot table. By index we mean the values to group by; these are the rows + pivot table. By index we mean the values to group by; these are the rows in the output pivot table.
    pivot_cols
    VARCHAR. Comma-separated columns that will form the columns of the @@ -99,13 +99,13 @@ pivot( possible to assign a set of aggregates per value column. Please refer to the examples 12\-14 below for syntax details.
    - @note Only aggregates with + @note Only aggregates with strict transition functions are permitted here. - A strict transition function means rows with null values are ignored; - the function is not called and the previous state value is retained. + A strict transition function means rows with null values are ignored; + the function is not called and the previous state value is retained. If you need some other behavior for null inputs, this should be done prior to calling the pivot function. - Aggregates with strict transition + Aggregates with strict transition functions are described in [2,3].
    fill_value (optional)
    @@ -133,7 +133,7 @@ pivot( @note - NULLs in the index column are treated like any other value. - NULLs in the pivot column are ignored unless keep_null is TRUE. -- Only strict transition functions are +- Only strict transition functions are allowed so NULLs are ignored. - It is not allowed to set the fill_value parameter without setting the aggregate_func parameter due to possible ambiguity. Set @@ -199,7 +199,7 @@ SELECT id,id2,piv,piv2,val,val2 FROM pivset_ext ORDER BY id,id2,piv,piv2,val,val2;
    - id | id2 | piv | piv2 | val | val2 
    + id | id2 | piv | piv2 | val | val2
     ----+-----+-----+------+-----+------
       0 |   0 |  10 |    0 |   1 |   11
       0 |   1 |  10 |  100 |   2 |   12
    @@ -240,14 +240,14 @@ DROP AGGREGATE IF EXISTS array_accum1 (anyelement);
     CREATE AGGREGATE array_accum1 (anyelement) (
         sfunc = array_add1,
         stype = anyarray,
    -    initcond = '{}'                                                                                                                                           
    +    initcond = '{}'
     );
     DROP TABLE IF EXISTS pivout;
     SELECT madlib.pivot('pivset_ext', 'pivout', 'id', 'piv', 'val', 'array_accum1');
     SELECT * FROM pivout ORDER BY id;
     
    -  id | val_array_accum1_piv_10 | val_array_accum1_piv_20 | val_array_accum1_piv_30 
    +  id | val_array_accum1_piv_10 | val_array_accum1_piv_20 | val_array_accum1_piv_30
     ----+-------------------------+-------------------------+-------------------------
       0 | {1,2}                   | {3}                     | {}
       1 | {7}                     | {4}                     | {5,6}
    @@ -316,30 +316,30 @@ SELECT * FROM pivout ORDER BY id;
     id                      | 0
     val_avg_piv_10_piv2_0   | 1
     val_avg_piv_10_piv2_100 | 2
    -val_avg_piv_10_piv2_200 | 
    -val_avg_piv_10_piv2_300 | 
    -val_avg_piv_20_piv2_0   | 
    +val_avg_piv_10_piv2_200 |
    +val_avg_piv_10_piv2_300 |
    +val_avg_piv_20_piv2_0   |
     val_avg_piv_20_piv2_100 | 3
    -val_avg_piv_20_piv2_200 | 
    -val_avg_piv_20_piv2_300 | 
    -val_avg_piv_30_piv2_0   | 
    -val_avg_piv_30_piv2_100 | 
    -val_avg_piv_30_piv2_200 | 
    -val_avg_piv_30_piv2_300 | 
    +val_avg_piv_20_piv2_200 |
    +val_avg_piv_20_piv2_300 |
    +val_avg_piv_30_piv2_0   |
    +val_avg_piv_30_piv2_100 |
    +val_avg_piv_30_piv2_200 |
    +val_avg_piv_30_piv2_300 |
     -[ RECORD 2 ]-----------+----
     id                      | 1
    -val_avg_piv_10_piv2_0   | 
    -val_avg_piv_10_piv2_100 | 
    +val_avg_piv_10_piv2_0   |
    +val_avg_piv_10_piv2_100 |
     val_avg_piv_10_piv2_200 | 7
    -val_avg_piv_10_piv2_300 | 
    -val_avg_piv_20_piv2_0   | 
    +val_avg_piv_10_piv2_300 |
    +val_avg_piv_20_piv2_0   |
     val_avg_piv_20_piv2_100 | 4
    -val_avg_piv_20_piv2_200 | 
    -val_avg_piv_20_piv2_300 | 
    -val_avg_piv_30_piv2_0   | 
    -val_avg_piv_30_piv2_100 | 
    +val_avg_piv_20_piv2_200 |
    +val_avg_piv_20_piv2_300 |
    +val_avg_piv_30_piv2_0   |
    +val_avg_piv_30_piv2_100 |
     val_avg_piv_30_piv2_200 | 5.5
    -val_avg_piv_30_piv2_300 | 
    +val_avg_piv_30_piv2_300 |
     ...
     
    @@ -354,10 +354,10 @@ SELECT * FROM pivout ORDER BY id; id | 0 val_avg_piv_10 | 1.5 val_avg_piv_20 | 3 -val_avg_piv_30 | +val_avg_piv_30 | val2_avg_piv_10 | 11.5 val2_avg_piv_20 | 13 -val2_avg_piv_30 | +val2_avg_piv_30 | -[ RECORD 2 ]---+----- id | 1 val_avg_piv_10 | 7 @@ -381,10 +381,10 @@ SELECT * FROM pivout ORDER BY id; id | 0 val_avg_piv_10 | 1.5 val_avg_piv_20 | 3 -val_avg_piv_30 | +val_avg_piv_30 | val_sum_piv_10 | 3 val_sum_piv_20 | 3 -val_sum_piv_30 | +val_sum_piv_30 | -[ RECORD 2 ]--+---- id | 1 val_avg_piv_10 | 7 @@ -435,13 +435,13 @@ SELECT * FROM pivout ORDER BY id; id | 0 val_avg_piv_10 | 1.5 val_avg_piv_20 | 3 -val_avg_piv_30 | +val_avg_piv_30 | val2_avg_piv_10 | 11.5 val2_avg_piv_20 | 13 -val2_avg_piv_30 | +val2_avg_piv_30 | val2_sum_piv_10 | 23 val2_sum_piv_20 | 13 -val2_sum_piv_30 | +val2_sum_piv_30 | -[ RECORD 2 ]---+----- id | 1 val_avg_piv_10 | 7 @@ -577,7 +577,7 @@ SELECT madlib.pivot('pivset_ext', 'pivout', 'id, id2', 'piv, piv2', 'val, val2', SELECT * FROM pivout_dictionary;
    -  __pivot_cid__ | pval | agg | piv | piv2 |           col_name           
    +  __pivot_cid__ | pval | agg | piv | piv2 |           col_name
     ---------------+------+-----+-----+------+------------------------------
      __p_1__       | val  | avg |     |  100 | "val_avg_piv_null_piv2_100"
      __p_5__       | val  | avg |  10 |  100 | "val_avg_piv_10_piv2_100"
    
    From c19e8731aab3dc3e5abedd08724954a9c4ab6e6f Mon Sep 17 00:00:00 2001
    From: Orhan Kislal 
    Date: Mon, 12 Dec 2016 10:25:11 -0800
    Subject: [PATCH 2/5] SSSP: minor fixes for neg. cycles and error messages.
    
    ---
     src/ports/postgres/modules/graph/sssp.py_in | 33 +++++++++++----------
     1 file changed, 17 insertions(+), 16 deletions(-)
    
    diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
    index 5d5ee786c..bd708dd9f 100644
    --- a/src/ports/postgres/modules/graph/sssp.py_in
    +++ b/src/ports/postgres/modules/graph/sssp.py_in
    @@ -110,8 +110,8 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     			""".format(**locals()))
     
     		v_cnt = plpy.execute("SELECT count(*) FROM {vertex_table}".
    -			format(**locals()))
    -		for i in range(0,v_cnt[0]['count']):
    +			format(**locals()))[0]['count']
    +		for i in range(0,v_cnt+1):
     
     			plpy.execute("TRUNCATE TABLE {0}".format(toupdate))
     			plpy.execute(
    @@ -168,18 +168,7 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     						AND toupdate.val + edge_table.{weight} = x.val
     						""".format(**locals()))
     
    -		ncycle = plpy.execute(
    -			""" SELECT out_table1.{vertex_id} AS v1,
    -					out_table2.{vertex_id} AS v2
    -				FROM {out_table} AS out_table1 , {out_table} AS out_table2,
    -					{edge_table} AS edge_table
    -				WHERE out_table1.{vertex_id} = edge_table.{src} AND
    -					out_table2.{vertex_id} = edge_table.{dest} AND
    -					out_table1.{weight} + edge_table.{weight} <
    -					out_table2.{weight}
    -				LIMIT 1
    -			""".format(**locals()))
    -		if ncycle:
    +		if i == v_cnt:
     			plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
     
     		sql = m4_ifdef(,
    @@ -203,7 +192,8 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
     	ret = [dest_vertex]
     	sql = "SELECT parent FROM {sssp_table} WHERE {id} = {cur} LIMIT 1"
     	parent = plpy.execute(sql.format(**locals()))
    -	if parent is None:
    +
    +	if parent.nrows() == 0:
     		plpy.error(
     			"Graph SSSP: Vertex {0} is not present in the sssp table {1}".
     			format(dest_vertex,sssp_table))
    @@ -237,7 +227,7 @@ def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
     		"Graph SSSP: Vertex table ({0}) is empty!".format(vertex_table))
     
     	_assert(edge_table and edge_table.strip().lower() not in ('null', ''),
    -		"Graph SSSP: Invalid vertex table name!")
    +		"Graph SSSP: Invalid edge table name!")
     	_assert(table_exists(edge_table),
     		"Graph SSSP: Edge table ({0}) is missing!".format(edge_table))
     	_assert(not table_is_empty(edge_table),
    @@ -251,6 +241,17 @@ def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
     		"Graph SSSP: Not all columns from {0} present in edge table ({1})".
     		format(edge_params.values(), edge_table))
     
    +	_assert(isinstance(source_vertex,int), """Graph SSSP: Source vertex
    +		{source_vertex} has to be an integer """.format(**locals()))
    +	src_exists = plpy.execute("""
    +		SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
    +		""".format(**locals()))
    +
    +	if src_exists.nrows() == 0:
    +		plpy.error(
    +			"""Graph SSSP: Source vertex {source_vertex} is not present in the
    +				vertex table {vertex_table}""".format(**locals()))
    +
     	return None
     
     def validate_get_path(sssp_table, dest_vertex, **kwargs):
    
    From cdaef2af0428063a598eb2954d1d6c8c28362cb5 Mon Sep 17 00:00:00 2001
    From: Orhan Kislal 
    Date: Mon, 12 Dec 2016 11:46:12 -0800
    Subject: [PATCH 3/5] SSSP: Minor error message changes
    
    ---
     src/ports/postgres/modules/graph/sssp.py_in | 13 +++++++------
     1 file changed, 7 insertions(+), 6 deletions(-)
    
    diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
    index bd708dd9f..e08aee4e1 100644
    --- a/src/ports/postgres/modules/graph/sssp.py_in
    +++ b/src/ports/postgres/modules/graph/sssp.py_in
    @@ -234,15 +234,16 @@ def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
     		"Graph SSSP: Edge table ({0}) is empty!".format(edge_table))
     
     	existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
    -	_assert(vertex_id in existing_cols, """Graph SSSP: The {vertex_id}
    -		column is not present in vertex table ({vertex_table})
    -		""".format(**locals()))
    +	_assert(vertex_id in existing_cols,
    +		"""Graph SSSP: The vertex column {vertex_id} is not present in vertex
    +		table ({vertex_table}) """.format(**locals()))
     	_assert(columns_exist_in_table(edge_table, edge_params.values()),
     		"Graph SSSP: Not all columns from {0} present in edge table ({1})".
     		format(edge_params.values(), edge_table))
     
    -	_assert(isinstance(source_vertex,int), """Graph SSSP: Source vertex
    -		{source_vertex} has to be an integer """.format(**locals()))
    +	_assert(isinstance(source_vertex,int),
    +		"""Graph SSSP: Source vertex {source_vertex} has to be an integer """.
    +		format(**locals()))
     	src_exists = plpy.execute("""
     		SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
     		""".format(**locals()))
    @@ -250,7 +251,7 @@ def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
     	if src_exists.nrows() == 0:
     		plpy.error(
     			"""Graph SSSP: Source vertex {source_vertex} is not present in the
    -				vertex table {vertex_table}""".format(**locals()))
    +			vertex table {vertex_table}""".format(**locals()))
     
     	return None
     
    
    From f4c395812442d9f7dc023701177724a27abf6933 Mon Sep 17 00:00:00 2001
    From: Orhan Kislal 
    Date: Thu, 15 Dec 2016 14:32:50 -0800
    Subject: [PATCH 4/5] Graph: SSSP
    
    Eliminates the need for the message table for better performance.
    Adds support for double precision weights.
    Adds querry planning to get path function.
    Adds indices for the locally created tables.
    ---
     src/ports/postgres/modules/graph/sssp.py_in  | 121 +++++++++++--------
     src/ports/postgres/modules/graph/sssp.sql_in |   2 +-
     2 files changed, 74 insertions(+), 49 deletions(-)
    
    diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
    index e08aee4e1..79610d810 100644
    --- a/src/ports/postgres/modules/graph/sssp.py_in
    +++ b/src/ports/postgres/modules/graph/sssp.py_in
    @@ -43,7 +43,8 @@ m4_changequote(`')
     def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     		edge_args, source_vertex, out_table, **kwargs):
     	"""
    -    Single source shortest path function for graphs
    +    Single source shortest path function for graphs using the Bellman-Ford
    +    algorhtm [1].
         Args:
             @param vertex_table     Name of the table that contains the vertex data.
             @param vertex_id        Name of the column containing the vertex ids.
    @@ -52,14 +53,19 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
             						named arguments of the form "name=value".
             @param source_vertex    The source vertex id for the algorithm to start.
             @param out_table   	    Name of the table to store the result of SSSP.
    +
    +    [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
         """
     
     	with MinWarning("warning"):
     
     		INT_MAX = 2147483647
    +		EPSILON = 1.0E-06
     
     		message = unique_string(desp='message')
    -		toupdate = unique_string(desp='toupdate')
    +
    +		oldupdate = unique_string(desp='oldupdate')
    +		newupdate = unique_string(desp='newupdate')
     
     		params_types = {'src': str, 'dest': str, 'weight': str}
     		default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
    @@ -81,51 +87,50 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     		validate_graph_coding(vertex_table, vertex_id, edge_table,
     			edge_params, source_vertex, out_table)
     
    -		plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(message,toupdate))
    +		plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
    +			message,oldupdate,newupdate))
     
     		plpy.execute(
     			""" CREATE TABLE {out_table} AS
     				SELECT {vertex_id}::INT AS {vertex_id},
    -					CAST({INT_MAX} AS INT) AS {weight},
    +					CAST('Infinity' AS DOUBLE PRECISION) AS {weight},
     					CAST({INT_MAX} AS INT) AS parent
     				FROM {vertex_table} {distribution} """.format(**locals()))
    -
     		plpy.execute(
    -			""" CREATE TEMP TABLE {message}(
    -				id INT, val INT, parent INT)
    -				{local_distribution} """.format(**locals()))
    +			""" CREATE TEMP TABLE {oldupdate}(
    +				id INT, val DOUBLE PRECISION, parent INT)
    +				{local_distribution}
    +				""".format(**locals()))
     		plpy.execute(
    -			""" CREATE TEMP TABLE {toupdate}(
    -				id INT, val INT, parent INT)
    -				{local_distribution} """.format(**locals()))
    +			""" CREATE TEMP TABLE {newupdate}(
    +				id INT, val DOUBLE PRECISION, parent INT)
    +				{local_distribution}
    +				""".format(**locals()))
     		temp_table = unique_string(desp='temp')
     		sql = m4_ifdef(,
     			""" CREATE TABLE {temp_table} (
    -					{vertex_id} INT, {weight} INT, parent INT) {distribution};
    +					{vertex_id} INT, {weight} DOUBLE PRECISION, parent INT)
    +					{distribution};
     			""",  )
     		plpy.execute(sql.format(**locals()))
     
    +		sql_index = m4_ifdef(,
    +			,
    +			)
    +		plpy.execute(sql_index)
    +
     		plpy.execute(
    -			""" INSERT INTO {message} VALUES({source_vertex},0,{source_vertex})
    +			""" INSERT INTO {oldupdate}
    +				VALUES({source_vertex},0,{source_vertex})
     			""".format(**locals()))
     
     		v_cnt = plpy.execute("SELECT count(*) FROM {vertex_table}".
     			format(**locals()))[0]['count']
     		for i in range(0,v_cnt+1):
     
    -			plpy.execute("TRUNCATE TABLE {0}".format(toupdate))
    -			plpy.execute(
    -				""" INSERT INTO {toupdate}
    -					SELECT DISTINCT ON (message.id) message.id AS id,
    -						message.val AS val,
    -						message.parent AS parent
    -					FROM {message} AS message, {out_table} AS out_table
    -					WHERE message.id = out_table.{vertex_id}
    -						AND message.val,
     				,
     				)
     			plpy.execute(sql.format(**locals()))
    -			plpy.execute("TRUNCATE TABLE {0}".format(message))
    -			plpy.execute(
    -				""" INSERT INTO {message}
    -					SELECT edge_table.{dest} AS id, x.val AS val,
    -						toupdate.id AS parent
    -					FROM {toupdate} AS toupdate, {edge_table} AS edge_table, (
    -						SELECT edge_table.{dest} AS id,
    -							min(toupdate.val + edge_table.{weight}) AS val
    -						FROM {toupdate} AS toupdate,
    +
    +			plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
    +			sql = ("""
    +				INSERT INTO {newupdate}
    +				SELECT DISTINCT ON (message.id) message.id AS id,
    +					message.val AS val,
    +					message.parent AS parent
    +				FROM  {out_table} AS out_table,
    +					(SELECT edge_table.{dest} AS id, x.val AS val,
    +						oldupdate.id AS parent
    +					FROM {oldupdate} AS oldupdate, {edge_table} AS edge_table,
    +						(SELECT edge_table.{dest} AS id,
    +							min(oldupdate.val + edge_table.{weight}) AS val
    +						FROM {oldupdate} AS oldupdate,
     							{edge_table} AS edge_table
    -						WHERE edge_table.{src}=toupdate.id
    +						WHERE edge_table.{src}=oldupdate.id
     						GROUP BY edge_table.{dest}) x
    -					WHERE edge_table.{src} = toupdate.id
    +					WHERE edge_table.{src} = oldupdate.id
     						AND edge_table.{dest} = x.id
    -						AND toupdate.val + edge_table.{weight} = x.val
    -						""".format(**locals()))
    +						AND ABS(oldupdate.val + edge_table.{weight} - x.val) <
    +							{EPSILON}
    +					) AS message
    +				WHERE message.id = out_table.{vertex_id}
    +					AND message.val,
     			""" DROP TABLE {temp_table} """,  )
    @@ -190,7 +211,12 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
     	cols = get_cols(sssp_table)
     	id = cols[0]
     	ret = [dest_vertex]
    -	sql = "SELECT parent FROM {sssp_table} WHERE {id} = {cur} LIMIT 1"
    +	plan_name = unique_string(desp='plan')
    +
    +	plpy.execute(""" PREPARE {plan_name} (int) AS
    +		SELECT parent FROM {sssp_table} WHERE {id} = $1 LIMIT 1
    +		""".format(**locals()))
    +	sql = "EXECUTE {plan_name} ({cur})"
     	parent = plpy.execute(sql.format(**locals()))
     
     	if parent.nrows() == 0:
    @@ -206,7 +232,6 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
     		else:
     			ret.append(parent)
     			cur = parent
    -
     		parent = plpy.execute(sql.format(**locals()))
     
     	return None
    diff --git a/src/ports/postgres/modules/graph/sssp.sql_in b/src/ports/postgres/modules/graph/sssp.sql_in
    index 6b82f6dcb..479f9e06d 100644
    --- a/src/ports/postgres/modules/graph/sssp.sql_in
    +++ b/src/ports/postgres/modules/graph/sssp.sql_in
    @@ -232,7 +232,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp_get_path(
     
     ) RETURNS INT[] AS $$
         PythonFunction(graph, sssp, graph_sssp_get_path)
    -$$ LANGUAGE plpythonu IMMUTABLE
    +$$ LANGUAGE plpythonu VOLATILE
     m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `');
     -------------------------------------------------------------------------
     
    
    From ae6ebd2d1cc75aab038af8639e630e7a966e28d3 Mon Sep 17 00:00:00 2001
    From: Orhan Kislal 
    Date: Mon, 19 Dec 2016 13:18:32 -0800
    Subject: [PATCH 5/5] Graph: SSSP
    
    - Adds comments to explain the code.
    - Fixes a HAWQ bug
    - Converts JOINs to INNER JOIN ON clauses
    ---
     src/ports/postgres/modules/graph/sssp.py_in | 98 +++++++++++++++------
     1 file changed, 71 insertions(+), 27 deletions(-)
    
    diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
    index 79610d810..c60d2c425 100644
    --- a/src/ports/postgres/modules/graph/sssp.py_in
    +++ b/src/ports/postgres/modules/graph/sssp.py_in
    @@ -60,7 +60,7 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     	with MinWarning("warning"):
     
     		INT_MAX = 2147483647
    -		EPSILON = 1.0E-06
    +		EPSILON = 0.000001
     
     		message = unique_string(desp='message')
     
    @@ -90,12 +90,19 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     		plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
     			message,oldupdate,newupdate))
     
    +		# We keep a table of every vertex, the minimum cost to that destination
    +		# seen so far and the parent to this vertex in the associated shortest
    +		# path. This table will be updated throughtout the execution.
     		plpy.execute(
     			""" CREATE TABLE {out_table} AS
    -				SELECT {vertex_id}::INT AS {vertex_id},
    +				SELECT {vertex_id} AS {vertex_id},
     					CAST('Infinity' AS DOUBLE PRECISION) AS {weight},
    -					CAST({INT_MAX} AS INT) AS parent
    +					NULL::INT AS parent
     				FROM {vertex_table} {distribution} """.format(**locals()))
    +
    +		# We keep 2 update tables and alternate them during the execution.
    +		# This is necessary since we need to know which vertices are updated in
    +		# the previous iteration to calculate the next set of updates.
     		plpy.execute(
     			""" CREATE TEMP TABLE {oldupdate}(
     				id INT, val DOUBLE PRECISION, parent INT)
    @@ -106,6 +113,9 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     				id INT, val DOUBLE PRECISION, parent INT)
     				{local_distribution}
     				""".format(**locals()))
    +
    +		# Since HAWQ does not allow us to update, we create a new table and
    +		# rename at every iteration
     		temp_table = unique_string(desp='temp')
     		sql = m4_ifdef(,
     			""" CREATE TABLE {temp_table} (
    @@ -114,6 +124,8 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     			""",  )
     		plpy.execute(sql.format(**locals()))
     
    +		# GPDB and HAWQ have distributed by clauses to help them with indexing.
    +		# For Postgres we add the indices manually.
     		sql_index = m4_ifdef(,
     			)
     		plpy.execute(sql_index)
     
    +		# The source can be reached with 0 cost and it has itself as the parent.
     		plpy.execute(
     			""" INSERT INTO {oldupdate}
     				VALUES({source_vertex},0,{source_vertex})
    @@ -131,6 +144,7 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     			format(**locals()))[0]['count']
     		for i in range(0,v_cnt+1):
     
    +			# Apply the updates calculated in the last iteration
     			sql = m4_ifdef(,
     				,
     				)
     			plpy.execute(sql.format(**locals()))
     
    +
     			plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
    -			sql = ("""
    -				INSERT INTO {newupdate}
    +
    +			# 'oldupdate' table has the update info from the last iteration
    +
    +			# Consider every edge that has an updated source
    +			# From these edges:
    +			# For every destination vertex, find the min total cost to reach.
    +			# Note that, just calling an aggregate function with group by won't
    +			# let us store the src field of the edge (needed for the parent).
    +			# This is why we need the 'x'; it gives a list of destinations and
    +			# associated min values. Using these values, we identify which edge
    +			# is selected.
    +
    +			# Since using '='' with floats is dangerous we use an epsilon value
    +			# for comparison.
    +
    +			# Once we have a list of edges and values (stores as 'message'),
    +			# we check if these values are lower than the existing shortest path
    +			# values.
    +
    +			sql = (""" INSERT INTO {newupdate}
     				SELECT DISTINCT ON (message.id) message.id AS id,
     					message.val AS val,
     					message.parent AS parent
    -				FROM  {out_table} AS out_table,
    -					(SELECT edge_table.{dest} AS id, x.val AS val,
    -						oldupdate.id AS parent
    -					FROM {oldupdate} AS oldupdate, {edge_table} AS edge_table,
    -						(SELECT edge_table.{dest} AS id,
    -							min(oldupdate.val + edge_table.{weight}) AS val
    -						FROM {oldupdate} AS oldupdate,
    -							{edge_table} AS edge_table
    -						WHERE edge_table.{src}=oldupdate.id
    -						GROUP BY edge_table.{dest}) x
    -					WHERE edge_table.{src} = oldupdate.id
    -						AND edge_table.{dest} = x.id
    -						AND ABS(oldupdate.val + edge_table.{weight} - x.val) <
    -							{EPSILON}
    -					) AS message
    -				WHERE message.id = out_table.{vertex_id}
    -					AND message.val,
    -			""" DROP TABLE {temp_table} """,  )
    -		plpy.execute(sql.format(**locals()))
    +		m4_ifdef(,
    +			plpy.execute("DROP TABLE {temp_table} ".format(**locals())), )
     
     	return None
     
    @@ -206,6 +246,7 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
             @param out_table	The vertex that will be the destination of the
                 				desired path.
     	"""
    +
     	validate_get_path(sssp_table, dest_vertex)
     	cur = dest_vertex
     	cols = get_cols(sssp_table)
    @@ -213,6 +254,9 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
     	ret = [dest_vertex]
     	plan_name = unique_string(desp='plan')
     
    +	# Follow the 'parent' chain until you reach the source.
    +	# We don't need to know what the source is since it is the only vertex with
    +	# itself as its parent
     	plpy.execute(""" PREPARE {plan_name} (int) AS
     		SELECT parent FROM {sssp_table} WHERE {id} = $1 LIMIT 1
     		""".format(**locals()))