From d5821a12b6c0d35ffa043c58efba922aecfd9133 Mon Sep 17 00:00:00 2001 From: Weibin Zeng Date: Mon, 1 Nov 2021 14:48:33 +0800 Subject: [PATCH] Add a flattened wrapper on property graph to run apps defined on simple graph (#888) Signed-off-by: acezen --- .../apps/boundary/edge_boundary.h | 35 +- .../apps/boundary/node_boundary.h | 33 +- analytical_engine/apps/boundary/utils.h | 43 + .../centrality/degree/degree_centrality.h | 41 +- .../apps/clustering/avg_clustering.h | 190 ++--- .../apps/clustering/clustering.h | 190 ++--- analytical_engine/apps/hits/hits.h | 22 +- .../apps/pagerank/pagerank_networkx.h | 63 +- .../simple_path/all_simple_paths_context.h | 10 +- .../apps/simple_path/is_simple_path_context.h | 13 +- analytical_engine/apps/simple_path/utils.h | 52 -- .../core/context/tensor_context.h | 1 + .../core/context/vertex_data_context.h | 1 + .../core/fragment/arrow_flattened_fragment.h | 773 ++++++++++++++++++ .../core/fragment/arrow_projected_fragment.h | 5 +- .../core/fragment/dynamic_fragment.h | 2 +- .../fragment/dynamic_projected_fragment.h | 286 +++---- analytical_engine/core/grape_instance.cc | 5 +- .../core/object/fragment_wrapper.h | 76 +- .../core/object/i_fragment_wrapper.h | 2 +- analytical_engine/frame/project_frame.cc | 48 ++ .../gscoordinator/builtin/app/.gs_conf.yaml | 3 +- coordinator/gscoordinator/utils.py | 24 +- proto/graph_def.proto | 5 +- python/graphscope/analytical/app/__init__.py | 2 + .../app/average_shortest_path_length.py | 49 ++ python/graphscope/analytical/app/k_core.py | 2 +- python/graphscope/analytical/app/triangles.py | 2 +- python/graphscope/framework/app.py | 3 +- python/graphscope/framework/dag_utils.py | 42 + python/graphscope/nx/algorithms/builtin.py | 90 +- python/graphscope/nx/classes/graph.py | 36 +- .../algorithms/builtin/test_shortest_paths.py | 10 +- .../graphscope/nx/tests/classes/test_graph.py | 164 ++-- python/graphscope/nx/tests/conftest.py | 2 - .../graphscope/nx/tests/test_copy_on_write.py | 366 +++++++++ .../graphscope/nx/tests/test_ctx_builtin.py | 56 +- .../{test_nx.py => test_transformation.py} | 137 +--- 38 files changed, 2084 insertions(+), 800 deletions(-) create mode 100644 analytical_engine/apps/boundary/utils.h delete mode 100644 analytical_engine/apps/simple_path/utils.h create mode 100644 analytical_engine/core/fragment/arrow_flattened_fragment.h create mode 100644 python/graphscope/analytical/app/average_shortest_path_length.py create mode 100644 python/graphscope/nx/tests/test_copy_on_write.py rename python/graphscope/nx/tests/{test_nx.py => test_transformation.py} (85%) diff --git a/analytical_engine/apps/boundary/edge_boundary.h b/analytical_engine/apps/boundary/edge_boundary.h index e37ccbd6f0ba..14412a2adad7 100644 --- a/analytical_engine/apps/boundary/edge_boundary.h +++ b/analytical_engine/apps/boundary/edge_boundary.h @@ -23,6 +23,7 @@ limitations under the License. #include "grape/grape.h" #include "apps/boundary/edge_boundary_context.h" +#include "apps/boundary/utils.h" #include "core/app/app_base.h" namespace gs { @@ -46,36 +47,36 @@ class EdgeBoundary : public AppBase>, void PEval(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { // parse input node array from json - folly::dynamic node_array_1 = folly::parseJson(ctx.nbunch1); - std::set node_gid_set, node_gid_set_2; + folly::dynamic source_array = folly::parseJson(ctx.nbunch1); + std::set source_gid_set, target_gid_set; vid_t gid; vertex_t u; - for (const auto& oid : node_array_1) { - if (frag.Oid2Gid(oid, gid)) { - node_gid_set.insert(gid); + for (const auto& node : source_array) { + if (frag.Oid2Gid(dynamic_to_oid(node), gid)) { + source_gid_set.insert(gid); } } if (!ctx.nbunch2.empty()) { - auto node_array_2 = folly::parseJson(ctx.nbunch2); - for (const auto& oid : node_array_2) { - if (frag.Oid2Gid(oid, gid)) { - node_gid_set_2.insert(gid); + auto target_array = folly::parseJson(ctx.nbunch2); + for (const auto& node : target_array) { + if (frag.Oid2Gid(dynamic_to_oid(node), gid)) { + target_gid_set.insert(gid); } } } // get the boundary - for (auto& gid : node_gid_set) { + for (auto& gid : source_gid_set) { if (frag.InnerVertexGid2Vertex(gid, u)) { - for (auto e : frag.GetOutgoingAdjList(u)) { - vid_t vgid = frag.Vertex2Gid(e.get_neighbor()); - if (node_gid_set_2.empty()) { - if (node_gid_set.find(vgid) == node_gid_set.end()) { - ctx.boundary.insert(std::make_pair(gid, vgid)); + for (auto& e : frag.GetOutgoingAdjList(u)) { + vid_t v_gid = frag.Vertex2Gid(e.get_neighbor()); + if (target_gid_set.empty()) { + if (source_gid_set.find(v_gid) == source_gid_set.end()) { + ctx.boundary.insert(std::make_pair(gid, v_gid)); } } else { - if (node_gid_set_2.find(vgid) != node_gid_set_2.end()) { - ctx.boundary.insert(std::make_pair(gid, vgid)); + if (target_gid_set.find(v_gid) != target_gid_set.end()) { + ctx.boundary.insert(std::make_pair(gid, v_gid)); } } } diff --git a/analytical_engine/apps/boundary/node_boundary.h b/analytical_engine/apps/boundary/node_boundary.h index 35641ae0c409..97aec0fc7d95 100644 --- a/analytical_engine/apps/boundary/node_boundary.h +++ b/analytical_engine/apps/boundary/node_boundary.h @@ -23,6 +23,7 @@ limitations under the License. #include "grape/grape.h" #include "apps/boundary/node_boundary_context.h" +#include "apps/boundary/utils.h" #include "core/app/app_base.h" namespace gs { @@ -46,33 +47,33 @@ class NodeBoundary : public AppBase>, void PEval(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { // parse input node array from json - folly::dynamic node_array_1 = folly::parseJson(ctx.nbunch1); - std::set node_gid_set, node_gid_set_2; + folly::dynamic source_array = folly::parseJson(ctx.nbunch1); + std::set source_gid_set, target_gid_set; vid_t gid; vertex_t v; - for (const auto& oid : node_array_1) { - if (frag.Oid2Gid(oid, gid)) { - node_gid_set.insert(gid); + for (const auto& node : source_array) { + if (frag.Oid2Gid(dynamic_to_oid(node), gid)) { + source_gid_set.insert(gid); } } if (!ctx.nbunch2.empty()) { - auto node_array_2 = folly::parseJson(ctx.nbunch2); - for (const auto& oid : node_array_2) { - if (frag.Oid2Gid(oid, gid)) { - node_gid_set_2.insert(gid); + auto target_array = folly::parseJson(ctx.nbunch2); + for (const auto& node : target_array) { + if (frag.Oid2Gid(dynamic_to_oid(node), gid)) { + target_gid_set.insert(gid); } } } // get the boundary - for (auto& gid : node_gid_set) { + for (auto& gid : source_gid_set) { if (frag.InnerVertexGid2Vertex(gid, v)) { - for (auto e : frag.GetOutgoingAdjList(v)) { - vid_t gid = frag.Vertex2Gid(e.get_neighbor()); - if (node_gid_set.find(gid) == node_gid_set.end() && - (node_gid_set_2.empty() || - node_gid_set_2.find(gid) != node_gid_set_2.end())) { - ctx.boundary.insert(gid); + for (auto& e : frag.GetOutgoingAdjList(v)) { + vid_t v_gid = frag.Vertex2Gid(e.get_neighbor()); + if (source_gid_set.find(v_gid) == source_gid_set.end() && + (target_gid_set.empty() || + target_gid_set.find(v_gid) != target_gid_set.end())) { + ctx.boundary.insert(v_gid); } } } diff --git a/analytical_engine/apps/boundary/utils.h b/analytical_engine/apps/boundary/utils.h new file mode 100644 index 000000000000..e22abf6167c2 --- /dev/null +++ b/analytical_engine/apps/boundary/utils.h @@ -0,0 +1,43 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_BOUNDARY_UTILS_H_ +#define ANALYTICAL_ENGINE_APPS_BOUNDARY_UTILS_H_ + +#include + +#include "folly/dynamic.h" + +namespace gs { +template +oid_t dynamic_to_oid(const folly::dynamic& node) {} + +template <> +int64_t dynamic_to_oid(const folly::dynamic& node) { + return node.asInt(); +} + +template <> +std::string dynamic_to_oid(const folly::dynamic& node) { + return node.asString(); +} + +template <> +folly::dynamic dynamic_to_oid(const folly::dynamic& node) { + return node; +} +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_BOUNDARY_UTILS_H_ diff --git a/analytical_engine/apps/centrality/degree/degree_centrality.h b/analytical_engine/apps/centrality/degree/degree_centrality.h index f8e53530a1cc..da16a1a789ac 100644 --- a/analytical_engine/apps/centrality/degree/degree_centrality.h +++ b/analytical_engine/apps/centrality/degree/degree_centrality.h @@ -43,26 +43,27 @@ class DegreeCentrality auto inner_vertices = frag.InnerVertices(); double max_degree = static_cast(frag.GetTotalVerticesNum() - 1); - ForEach(inner_vertices, [&frag, &ctx, max_degree](int tid, vertex_t v) { - switch (ctx.degree_centrality_type) { - case DegreeCentralityType::IN: { - ctx.centrality[v] = - static_cast(frag.GetLocalInDegree(v)) / max_degree; - break; - } - case DegreeCentralityType::OUT: { - ctx.centrality[v] = - static_cast(frag.GetLocalOutDegree(v)) / max_degree; - break; - } - case DegreeCentralityType::BOTH: { - double degree = static_cast(frag.GetLocalInDegree(v) + - frag.GetLocalOutDegree(v)); - ctx.centrality[v] = degree / max_degree; - break; - } - } - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&frag, &ctx, max_degree](int tid, vertex_t v) { + switch (ctx.degree_centrality_type) { + case DegreeCentralityType::IN: { + ctx.centrality[v] = + static_cast(frag.GetLocalInDegree(v)) / max_degree; + break; + } + case DegreeCentralityType::OUT: { + ctx.centrality[v] = + static_cast(frag.GetLocalOutDegree(v)) / max_degree; + break; + } + case DegreeCentralityType::BOTH: { + double degree = static_cast(frag.GetLocalInDegree(v) + + frag.GetLocalOutDegree(v)); + ctx.centrality[v] = degree / max_degree; + break; + } + } + }); } void IncEval(const fragment_t& frag, context_t& ctx, diff --git a/analytical_engine/apps/clustering/avg_clustering.h b/analytical_engine/apps/clustering/avg_clustering.h index 0f84e8072adc..7bf74f443872 100644 --- a/analytical_engine/apps/clustering/avg_clustering.h +++ b/analytical_engine/apps/clustering/avg_clustering.h @@ -48,12 +48,13 @@ class AvgClustering messages.InitChannels(thread_num()); ctx.stage = 0; - ForEach(inner_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) { - ctx.global_degree[v] = - frag.GetLocalOutDegree(v) + frag.GetLocalInDegree(v); - messages.SendMsgThroughEdges(frag, v, - ctx.global_degree[v], tid); - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&messages, &frag, &ctx](int tid, vertex_t v) { + ctx.global_degree[v] = + frag.GetLocalOutDegree(v) + frag.GetLocalInDegree(v); + messages.SendMsgThroughEdges( + frag, v, ctx.global_degree[v], tid); + }); messages.ForceContinue(); } @@ -69,91 +70,91 @@ class AvgClustering thread_num(), frag, [&ctx](int tid, vertex_t u, int msg) { ctx.global_degree[u] = msg; }); - ForEach(inner_vertices, [&frag, &ctx, &messages, &vertices](int tid, - vertex_t v) { - vid_t u_gid, v_gid; - auto& nbr_vec = ctx.complete_neighbor[v]; - int degree = ctx.global_degree[v]; - nbr_vec.reserve(degree); - std::vector> msg_vec; - msg_vec.reserve(degree); - - typename FRAG_T::template vertex_array_t is_rec; - is_rec.Init(vertices, 0); - auto es = frag.GetOutgoingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - is_rec[u]++; - } - es = frag.GetIncomingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - is_rec[u]++; - if (is_rec[u] == 2) { - ctx.rec_degree[v]++; - } - } + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&frag, &ctx, &messages, &vertices](int tid, vertex_t v) { + vid_t u_gid, v_gid; + auto& nbr_vec = ctx.complete_neighbor[v]; + int degree = ctx.global_degree[v]; + nbr_vec.reserve(degree); + std::vector> msg_vec; + msg_vec.reserve(degree); + + typename FRAG_T::template vertex_array_t is_rec; + is_rec.Init(vertices, 0); + auto es = frag.GetOutgoingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + is_rec[u]++; + } + es = frag.GetIncomingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + is_rec[u]++; + if (is_rec[u] == 2) { + ctx.rec_degree[v]++; + } + } - es = frag.GetOutgoingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - if (ctx.global_degree[u] < ctx.global_degree[v]) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 2) { - msg.second = 2; - } else { - msg.second = 1; - } - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, msg.second)); - } else if (ctx.global_degree[u] == ctx.global_degree[v]) { - u_gid = frag.Vertex2Gid(u); - v_gid = frag.GetInnerVertexGid(v); - if (v_gid > u_gid) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 2) { - msg.second = 2; - } else { - msg.second = 1; - } - nbr_vec.push_back(std::make_pair(u, msg.second)); - msg_vec.push_back(msg); - } - } - } + es = frag.GetOutgoingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + if (ctx.global_degree[u] < ctx.global_degree[v]) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 2) { + msg.second = 2; + } else { + msg.second = 1; + } + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, msg.second)); + } else if (ctx.global_degree[u] == ctx.global_degree[v]) { + u_gid = frag.Vertex2Gid(u); + v_gid = frag.GetInnerVertexGid(v); + if (v_gid > u_gid) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 2) { + msg.second = 2; + } else { + msg.second = 1; + } + nbr_vec.push_back(std::make_pair(u, msg.second)); + msg_vec.push_back(msg); + } + } + } - es = frag.GetIncomingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - if (ctx.global_degree[u] < ctx.global_degree[v]) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 1) { - msg.second = 1; - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, 1)); - } - } else if (ctx.global_degree[u] == ctx.global_degree[v]) { - u_gid = frag.Vertex2Gid(u); - v_gid = frag.GetInnerVertexGid(v); - if (v_gid > u_gid) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 1) { - msg.second = 1; - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, 1)); - } - } - } - } + es = frag.GetIncomingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + if (ctx.global_degree[u] < ctx.global_degree[v]) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 1) { + msg.second = 1; + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, 1)); + } + } else if (ctx.global_degree[u] == ctx.global_degree[v]) { + u_gid = frag.Vertex2Gid(u); + v_gid = frag.GetInnerVertexGid(v); + if (v_gid > u_gid) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 1) { + msg.second = 1; + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, 1)); + } + } + } + } - messages.SendMsgThroughEdges>>( - frag, v, msg_vec, tid); - }); + messages.SendMsgThroughEdges< + fragment_t, std::vector>>( + frag, v, msg_vec, tid); + }); messages.ForceContinue(); } else if (ctx.stage == 1) { ctx.stage = 2; @@ -195,12 +196,13 @@ class AvgClustering } } - ForEach(outer_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) { - if (ctx.tricnt[v] != 0) { - messages.SyncStateOnOuterVertex(frag, v, - ctx.tricnt[v], tid); - } - }); + ForEach(outer_vertices.begin(), outer_vertices.end(), + [&messages, &frag, &ctx](int tid, vertex_t v) { + if (ctx.tricnt[v] != 0) { + messages.SyncStateOnOuterVertex( + frag, v, ctx.tricnt[v], tid); + } + }); messages.ForceContinue(); } else if (ctx.stage == 2) { ctx.stage = 3; diff --git a/analytical_engine/apps/clustering/clustering.h b/analytical_engine/apps/clustering/clustering.h index a41fbf5b189c..f80465973f66 100644 --- a/analytical_engine/apps/clustering/clustering.h +++ b/analytical_engine/apps/clustering/clustering.h @@ -48,12 +48,13 @@ class Clustering messages.InitChannels(thread_num()); ctx.stage = 0; - ForEach(inner_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) { - ctx.global_degree[v] = - frag.GetLocalOutDegree(v) + frag.GetLocalInDegree(v); - messages.SendMsgThroughEdges(frag, v, - ctx.global_degree[v], tid); - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&messages, &frag, &ctx](int tid, vertex_t v) { + ctx.global_degree[v] = + frag.GetLocalOutDegree(v) + frag.GetLocalInDegree(v); + messages.SendMsgThroughEdges( + frag, v, ctx.global_degree[v], tid); + }); messages.ForceContinue(); } @@ -70,91 +71,91 @@ class Clustering thread_num(), frag, [&ctx](int tid, vertex_t u, int msg) { ctx.global_degree[u] = msg; }); - ForEach(inner_vertices, [&frag, &ctx, &messages, &vertices](int tid, - vertex_t v) { - vid_t u_gid, v_gid; - auto& nbr_vec = ctx.complete_neighbor[v]; - int degree = ctx.global_degree[v]; - nbr_vec.reserve(degree); - std::vector> msg_vec; - msg_vec.reserve(degree); - - typename FRAG_T::template vertex_array_t is_rec; - is_rec.Init(vertices, 0); - auto es = frag.GetOutgoingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - is_rec[u]++; - } - es = frag.GetIncomingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - is_rec[u]++; - if (is_rec[u] == 2) { - ctx.rec_degree[v]++; - } - } + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&frag, &ctx, &messages, &vertices](int tid, vertex_t v) { + vid_t u_gid, v_gid; + auto& nbr_vec = ctx.complete_neighbor[v]; + int degree = ctx.global_degree[v]; + nbr_vec.reserve(degree); + std::vector> msg_vec; + msg_vec.reserve(degree); + + typename FRAG_T::template vertex_array_t is_rec; + is_rec.Init(vertices, 0); + auto es = frag.GetOutgoingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + is_rec[u]++; + } + es = frag.GetIncomingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + is_rec[u]++; + if (is_rec[u] == 2) { + ctx.rec_degree[v]++; + } + } - es = frag.GetOutgoingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - if (ctx.global_degree[u] < ctx.global_degree[v]) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 2) { - msg.second = 2; - } else { - msg.second = 1; - } - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, msg.second)); - } else if (ctx.global_degree[u] == ctx.global_degree[v]) { - u_gid = frag.Vertex2Gid(u); - v_gid = frag.GetInnerVertexGid(v); - if (v_gid > u_gid) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 2) { - msg.second = 2; - } else { - msg.second = 1; - } - nbr_vec.push_back(std::make_pair(u, msg.second)); - msg_vec.push_back(msg); - } - } - } + es = frag.GetOutgoingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + if (ctx.global_degree[u] < ctx.global_degree[v]) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 2) { + msg.second = 2; + } else { + msg.second = 1; + } + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, msg.second)); + } else if (ctx.global_degree[u] == ctx.global_degree[v]) { + u_gid = frag.Vertex2Gid(u); + v_gid = frag.GetInnerVertexGid(v); + if (v_gid > u_gid) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 2) { + msg.second = 2; + } else { + msg.second = 1; + } + nbr_vec.push_back(std::make_pair(u, msg.second)); + msg_vec.push_back(msg); + } + } + } - es = frag.GetIncomingAdjList(v); - for (auto& e : es) { - auto u = e.get_neighbor(); - if (ctx.global_degree[u] < ctx.global_degree[v]) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 1) { - msg.second = 1; - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, 1)); - } - } else if (ctx.global_degree[u] == ctx.global_degree[v]) { - u_gid = frag.Vertex2Gid(u); - v_gid = frag.GetInnerVertexGid(v); - if (v_gid > u_gid) { - std::pair msg; - msg.first = frag.Vertex2Gid(u); - if (is_rec[u] == 1) { - msg.second = 1; - msg_vec.push_back(msg); - nbr_vec.push_back(std::make_pair(u, 1)); - } - } - } - } + es = frag.GetIncomingAdjList(v); + for (auto& e : es) { + auto u = e.get_neighbor(); + if (ctx.global_degree[u] < ctx.global_degree[v]) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 1) { + msg.second = 1; + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, 1)); + } + } else if (ctx.global_degree[u] == ctx.global_degree[v]) { + u_gid = frag.Vertex2Gid(u); + v_gid = frag.GetInnerVertexGid(v); + if (v_gid > u_gid) { + std::pair msg; + msg.first = frag.Vertex2Gid(u); + if (is_rec[u] == 1) { + msg.second = 1; + msg_vec.push_back(msg); + nbr_vec.push_back(std::make_pair(u, 1)); + } + } + } + } - messages.SendMsgThroughEdges>>( - frag, v, msg_vec, tid); - }); + messages.SendMsgThroughEdges< + fragment_t, std::vector>>( + frag, v, msg_vec, tid); + }); messages.ForceContinue(); } else if (ctx.stage == 1) { ctx.stage = 2; @@ -196,12 +197,13 @@ class Clustering } } - ForEach(outer_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) { - if (ctx.tricnt[v] != 0) { - messages.SyncStateOnOuterVertex(frag, v, - ctx.tricnt[v], tid); - } - }); + ForEach(outer_vertices.begin(), outer_vertices.end(), + [&messages, &frag, &ctx](int tid, vertex_t v) { + if (ctx.tricnt[v] != 0) { + messages.SyncStateOnOuterVertex( + frag, v, ctx.tricnt[v], tid); + } + }); messages.ForceContinue(); } else if (ctx.stage == 2) { ctx.stage = 3; diff --git a/analytical_engine/apps/hits/hits.h b/analytical_engine/apps/hits/hits.h index 23f535faa8de..2a8f036f37f0 100644 --- a/analytical_engine/apps/hits/hits.h +++ b/analytical_engine/apps/hits/hits.h @@ -83,15 +83,16 @@ class HITS : public grape::ParallelAppBase>, if (ctx.stage == AuthIteration) { hub_last.Swap(hub); - ForEach(inner_vertices, [&auth, &hub_last, &frag, &messages](int tid, - vertex_t u) { - auth[u] = 0.0; - for (auto& nbr : frag.GetIncomingAdjList(u)) { - auth[u] += hub_last[nbr.get_neighbor()]; - } - messages.Channels()[tid].SendMsgThroughEdges( - frag, u, auth[u]); - }); + ForEach( + inner_vertices.begin(), inner_vertices.end(), + [&auth, &hub_last, &frag, &messages](int tid, vertex_t u) { + auth[u] = 0.0; + for (auto& nbr : frag.GetIncomingAdjList(u)) { + auth[u] += hub_last[nbr.get_neighbor()]; + } + messages.Channels()[tid].SendMsgThroughEdges( + frag, u, auth[u]); + }); ctx.stage = HubIteration; if (frag.fnum() == 1) { @@ -104,7 +105,8 @@ class HITS : public grape::ParallelAppBase>, }); ForEach( - inner_vertices, [&hub, &auth, &frag, &messages](int tid, vertex_t u) { + inner_vertices.begin(), inner_vertices.end(), + [&hub, &auth, &frag, &messages](int tid, vertex_t u) { hub[u] = 0.0; for (auto& nbr : frag.GetOutgoingAdjList(u)) { hub[u] += auth[nbr.get_neighbor()]; diff --git a/analytical_engine/apps/pagerank/pagerank_networkx.h b/analytical_engine/apps/pagerank/pagerank_networkx.h index 1e0a5e6a561e..ae72cf96931e 100644 --- a/analytical_engine/apps/pagerank/pagerank_networkx.h +++ b/analytical_engine/apps/pagerank/pagerank_networkx.h @@ -62,14 +62,16 @@ class PageRankNetworkX double p = 1.0 / graph_vnum; // assign initial ranks - ForEach(inner_vertices, [&ctx, &frag, p, &messages](int tid, vertex_t u) { - ctx.result[u] = p; - ctx.degree[u] = static_cast(frag.GetOutgoingAdjList(u).Size()); - if (ctx.degree[u] != 0.0) { - messages.SendMsgThroughOEdges( - frag, u, ctx.result[u] / ctx.degree[u], tid); - } - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&ctx, &frag, p, &messages](int tid, vertex_t u) { + ctx.result[u] = p; + ctx.degree[u] = + static_cast(frag.GetOutgoingAdjList(u).Size()); + if (ctx.degree[u] != 0.0) { + messages.SendMsgThroughOEdges( + frag, u, ctx.result[u] / ctx.degree[u], tid); + } + }); for (auto u : inner_vertices) { if (ctx.degree[u] == 0.0) { @@ -103,23 +105,25 @@ class PageRankNetworkX }); } - ForEach(inner_vertices, [&ctx](int tid, vertex_t u) { - if (ctx.degree[u] > 0.0) { - ctx.pre_result[u] = ctx.result[u] / ctx.degree[u]; - } else { - ctx.pre_result[u] = ctx.result[u]; - } - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&ctx](int tid, vertex_t u) { + if (ctx.degree[u] > 0.0) { + ctx.pre_result[u] = ctx.result[u] / ctx.degree[u]; + } else { + ctx.pre_result[u] = ctx.result[u]; + } + }); double base = (1.0 - ctx.alpha) / graph_vnum + dangling_sum / graph_vnum; - ForEach(inner_vertices, [&ctx, base, &frag](int tid, vertex_t u) { - double cur = 0; - auto es = frag.GetIncomingAdjList(u); - for (auto& e : es) { - cur += ctx.pre_result[e.get_neighbor()]; - } - ctx.result[u] = cur * ctx.alpha + base; - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&ctx, base, &frag](int tid, vertex_t u) { + double cur = 0; + auto es = frag.GetIncomingAdjList(u); + for (auto& e : es) { + cur += ctx.pre_result[e.get_neighbor()]; + } + ctx.result[u] = cur * ctx.alpha + base; + }); double eps = 0.0; ctx.dangling_sum = 0.0; @@ -137,12 +141,13 @@ class PageRankNetworkX return; } - ForEach(inner_vertices, [&ctx, &frag, &messages](int tid, vertex_t u) { - if (ctx.degree[u] > 0) { - messages.SendMsgThroughOEdges( - frag, u, ctx.result[u] / ctx.degree[u], tid); - } - }); + ForEach(inner_vertices.begin(), inner_vertices.end(), + [&ctx, &frag, &messages](int tid, vertex_t u) { + if (ctx.degree[u] > 0) { + messages.SendMsgThroughOEdges( + frag, u, ctx.result[u] / ctx.degree[u], tid); + } + }); double new_dangling = ctx.alpha * static_cast(ctx.dangling_sum); Sum(new_dangling, ctx.dangling_sum); diff --git a/analytical_engine/apps/simple_path/all_simple_paths_context.h b/analytical_engine/apps/simple_path/all_simple_paths_context.h index 0763b279fbeb..6b48bc51519b 100644 --- a/analytical_engine/apps/simple_path/all_simple_paths_context.h +++ b/analytical_engine/apps/simple_path/all_simple_paths_context.h @@ -27,11 +27,11 @@ Author: Ma JingYuan #include #include -#include "apps/simple_path/utils.h" #include "folly/dynamic.h" #include "folly/json.h" #include "grape/grape.h" +#include "apps/boundary/utils.h" #include "core/context/tensor_context.h" namespace gs { @@ -64,11 +64,9 @@ class AllSimplePathsContext // init targets. vid_t gid; - std::vector target_oid_array; - folly::dynamic target_nodes_id_array = folly::parseJson(targets_json); - ExtractOidArrayFromDynamic(target_nodes_id_array, target_oid_array); - for (const auto& oid : target_oid_array) { - frag.Oid2Gid(oid, gid); + folly::dynamic target_nodes_array = folly::parseJson(targets_json); + for (const auto& node : target_nodes_array) { + frag.Oid2Gid(dynamic_to_oid(node), gid); targets.insert(gid); } diff --git a/analytical_engine/apps/simple_path/is_simple_path_context.h b/analytical_engine/apps/simple_path/is_simple_path_context.h index 364f1db16f35..3f450a0155f6 100644 --- a/analytical_engine/apps/simple_path/is_simple_path_context.h +++ b/analytical_engine/apps/simple_path/is_simple_path_context.h @@ -22,11 +22,11 @@ limitations under the License. #include #include -#include "apps/simple_path/utils.h" #include "folly/dynamic.h" #include "folly/json.h" #include "grape/grape.h" +#include "apps/boundary/utils.h" #include "core/context/tensor_context.h" namespace gs { @@ -57,12 +57,10 @@ class IsSimplePathContext : public TensorContext { vertex_t source; counter = 0; vid_t p1, p2; - std::vector path_oid_array; - folly::dynamic path_nodes_id_array = folly::parseJson(nodes_json); - ExtractOidArrayFromDynamic(path_nodes_id_array, path_oid_array); - for (const auto& val : path_oid_array) { + folly::dynamic path_nodes_array = folly::parseJson(nodes_json); + for (const auto& node : path_nodes_array) { counter++; - if (!frag.Oid2Gid(val, p1)) { + if (!frag.Oid2Gid(dynamic_to_oid(node), p1)) { LOG(ERROR) << "Input oid error" << std::endl; is_simple_path = false; break; @@ -87,7 +85,8 @@ class IsSimplePathContext : public TensorContext { if (counter == 0) { is_simple_path = false; } else if (counter == 1) { - if (frag.GetInnerVertex(path_oid_array[0], source)) + if (frag.GetInnerVertex(dynamic_to_oid(path_nodes_array[0]), + source)) is_simple_path = true; else is_simple_path = false; diff --git a/analytical_engine/apps/simple_path/utils.h b/analytical_engine/apps/simple_path/utils.h deleted file mode 100644 index 2eb147d6a9c2..000000000000 --- a/analytical_engine/apps/simple_path/utils.h +++ /dev/null @@ -1,52 +0,0 @@ -/** Copyright 2020 Alibaba Group Holding Limited. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -Author: Ma JingYuan -*/ - -#ifndef ANALYTICAL_ENGINE_APPS_SIMPLE_PATH_UTILS_H_ -#define ANALYTICAL_ENGINE_APPS_SIMPLE_PATH_UTILS_H_ - -#include -#include - -#include "folly/dynamic.h" - -namespace gs { -template -void ExtractOidArrayFromDynamic(folly::dynamic node_array, - std::vector& oid_array) {} - -template <> -void ExtractOidArrayFromDynamic(folly::dynamic node_array, - std::vector& oid_array) { - for (const auto& val : node_array) { - oid_array.push_back(val.asInt()); - } -} - -template <> -void ExtractOidArrayFromDynamic( - folly::dynamic node_array, std::vector& oid_array) { - for (const auto& val : node_array) { - oid_array.push_back(val.asString()); - } -} - -template <> -void ExtractOidArrayFromDynamic( - folly::dynamic node_array, std::vector& oid_array) { - for (const auto& val : node_array) { - oid_array.push_back(val); - } -} -} // namespace gs - -#endif // ANALYTICAL_ENGINE_APPS_SIMPLE_PATH_UTILS_H_ diff --git a/analytical_engine/core/context/tensor_context.h b/analytical_engine/core/context/tensor_context.h index c9e993b95ea0..130eaf28837a 100644 --- a/analytical_engine/core/context/tensor_context.h +++ b/analytical_engine/core/context/tensor_context.h @@ -34,6 +34,7 @@ #include "core/context/i_context.h" #include "core/context/tensor_dataframe_builder.h" #include "core/error.h" +#include "core/fragment/arrow_flattened_fragment.h" #include "core/fragment/arrow_projected_fragment.h" #include "core/fragment/dynamic_projected_fragment.h" #include "core/object/i_fragment_wrapper.h" diff --git a/analytical_engine/core/context/vertex_data_context.h b/analytical_engine/core/context/vertex_data_context.h index 62a88b43f1cc..f747c27bd8c7 100644 --- a/analytical_engine/core/context/vertex_data_context.h +++ b/analytical_engine/core/context/vertex_data_context.h @@ -39,6 +39,7 @@ #include "core/context/i_context.h" #include "core/context/tensor_dataframe_builder.h" #include "core/error.h" +#include "core/fragment/arrow_flattened_fragment.h" #include "core/fragment/arrow_projected_fragment.h" #include "core/fragment/dynamic_projected_fragment.h" #include "core/utils/transform_utils.h" diff --git a/analytical_engine/core/fragment/arrow_flattened_fragment.h b/analytical_engine/core/fragment/arrow_flattened_fragment.h new file mode 100644 index 000000000000..b9cc8d53335f --- /dev/null +++ b/analytical_engine/core/fragment/arrow_flattened_fragment.h @@ -0,0 +1,773 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_FLATTENED_FRAGMENT_H_ +#define ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_FLATTENED_FRAGMENT_H_ + +#include +#include +#include +#include +#include + +#include "vineyard/graph/fragment/arrow_fragment.h" + +namespace gs { + +namespace arrow_flattened_fragment_impl { + +/** + * @brief A union collection of continuous vertex ranges. The vertex ranges + * must be non-empty to construct the UnionVertexRange. + * + * @tparam T Vertex ID type. + */ +template +class UnionVertexRange { + public: + UnionVertexRange() {} + explicit UnionVertexRange( + const std::vector>& vertex_ranges) + : vertex_ranges_(std::move(vertex_ranges)) {} + + class iterator { + private: + std::reference_wrapper>> + vertex_ranges_; + grape::Vertex curr_vertex_; + size_t curr_range_index_; + + public: + iterator() = default; + explicit iterator(const std::vector>& vertex_ranges, + const grape::Vertex& c, size_t range_index) + : vertex_ranges_(vertex_ranges), + curr_vertex_(c), + curr_range_index_(range_index) {} + + const grape::Vertex& operator*() const { return curr_vertex_; } + + iterator& operator++() { + if (++curr_vertex_ == vertex_ranges_.get()[curr_range_index_].end()) { + ++curr_range_index_; + if (curr_range_index_ < vertex_ranges_.get().size()) { + curr_vertex_ = vertex_ranges_.get()[curr_range_index_].begin(); + } + } + return *this; + } + + iterator operator++(int) { + iterator ret(vertex_ranges_.get(), curr_vertex_, curr_range_index_); + ++(*this); + return ret; + } + + iterator operator+(size_t offset) const { + if (vertex_ranges_.get().empty()) { + return iterator(vertex_ranges_.get(), vertex_ranges_.get().back().end(), + 0); + } + + grape::Vertex new_vertex(curr_vertex_); + size_t new_range_index = curr_range_index_; + while (offset) { + if (new_vertex + offset < vertex_ranges_.get()[new_range_index].end()) { + new_vertex.SetValue(new_vertex.GetValue() + offset); + break; + } else if (new_range_index == vertex_ranges_.get().size() - 1) { + new_vertex = vertex_ranges_.get()[new_range_index].end(); + break; + } else { + offset -= (vertex_ranges_.get()[new_range_index].end().GetValue() - + new_vertex.GetValue()); + new_vertex = vertex_ranges_.get()[++new_range_index].begin(); + } + } + return iterator(vertex_ranges_.get(), new_vertex, new_range_index); + } + + inline bool operator==(const iterator& rhs) const { + return curr_vertex_ == rhs.curr_vertex_; + } + + inline bool operator!=(const iterator& rhs) const { + return curr_vertex_ != rhs.curr_vertex_; + } + + inline bool operator<(const iterator& rhs) const { + return curr_vertex_ < rhs.curr_vertex_; + } + }; + + iterator begin() const { + return iterator(vertex_ranges_, vertex_ranges_.front().begin(), 0); + } + iterator end() const { + return iterator(vertex_ranges_, vertex_ranges_.back().end(), + vertex_ranges_.size()); + } + + const std::vector>& GetVertexRanges() const { + return vertex_ranges_; + } + + private: + std::vector> vertex_ranges_; +}; + +/** + * @brief A wrapper of vineyard::property_graph_utils::Nbr with default + * property id to access data. + * + * @tparam VID_T. + * @tparam EID_T. + * @tparam EDATA_T. + */ +template +struct NbrDefault { + using nbr_t = vineyard::property_graph_utils::Nbr; + using nbr_unit_t = vineyard::property_graph_utils::NbrUnit; + using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE; + + public: + explicit NbrDefault(const prop_id_t& default_prop_id) + : default_prop_id_(default_prop_id) {} + NbrDefault(const nbr_t& nbr, const prop_id_t& default_prop_id) + : nbr_(nbr), default_prop_id_(default_prop_id) {} + NbrDefault(const NbrDefault& rhs) + : nbr_(rhs.nbr_), default_prop_id_(rhs.default_prop_id_) {} + NbrDefault(NbrDefault&& rhs) + : nbr_(rhs.nbr_), default_prop_id_(rhs.default_prop_id_) {} + + NbrDefault& operator=(const NbrDefault& rhs) { + nbr_ = rhs.nbr_; + default_prop_id_ = rhs.default_prop_id_; + return *this; + } + + NbrDefault& operator=(NbrDefault&& rhs) { + nbr_ = std::move(rhs.nbr_); + default_prop_id_ = rhs.default_prop_id_; + return *this; + } + + NbrDefault& operator=(const nbr_t& nbr) { + nbr_ = nbr; + return *this; + } + + NbrDefault& operator=(nbr_t&& nbr) { + nbr_ = std::move(nbr); + return *this; + } + + grape::Vertex neighbor() const { return nbr_.neighbor(); } + + grape::Vertex get_neighbor() const { return nbr_.get_neighbor(); } + + EID_T edge_id() const { return nbr_.edge_id(); } + + EDATA_T get_data() const { + return nbr_.template get_data(default_prop_id_); + } + + std::string get_str() const { return nbr_.get_str(default_prop_id_); } + + double get_double() const { return nbr_.get_double(default_prop_id_); } + + int64_t get_int() const { return nbr_.get_int(default_prop_id_); } + + inline const NbrDefault& operator++() const { + ++nbr_; + return *this; + } + + inline NbrDefault operator++(int) const { + NbrDefault ret(nbr_, default_prop_id_); + ++(*this); + return ret; + } + + inline const NbrDefault& operator--() const { + --nbr_; + return *this; + } + + inline NbrDefault operator--(int) const { + NbrDefault ret(nbr_, default_prop_id_); + --(*this); + return ret; + } + + inline bool operator==(const NbrDefault& rhs) const { + return nbr_ == rhs.nbr_; + } + inline bool operator!=(const NbrDefault& rhs) const { + return nbr_ != rhs.nbr_; + } + inline bool operator<(const NbrDefault& rhs) const { return nbr_ < rhs.nbr_; } + + inline bool operator==(const nbr_t& nbr) const { return nbr_ == nbr; } + inline bool operator!=(const nbr_t& nbr) const { return nbr_ != nbr; } + inline bool operator<(const nbr_t& nbr) const { return nbr_ < nbr; } + + inline const NbrDefault& operator*() const { return *this; } + + private: + nbr_t nbr_; + prop_id_t default_prop_id_; +}; + +/** + * @brief Union of all iteratable adjencent lists of a vertex. The union + * list contains all neighbors in format of NbrDefault, which contains the other + * Node and the data on the Edge. The lists must be non-empty to construct the + * UnionAdjList. + * + * @tparam VID_T + * @tparam EID_T + * @tparam EDATA_T + */ +template +class UnionAdjList { + public: + using nbr_t = NbrDefault; + using nbr_unit_t = vineyard::property_graph_utils::Nbr; + using adj_list_t = vineyard::property_graph_utils::AdjList; + using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE; + + UnionAdjList() : size_(0) {} + explicit UnionAdjList(const std::vector& adj_lists, + const prop_id_t& default_prop_id) + : adj_lists_(adj_lists), default_prop_id_(default_prop_id) { + size_ = 0; + for (auto& adj_list : adj_lists) { + size_ += adj_list.Size(); + } + } + + class iterator { + using pointer_type = nbr_t*; + using reference_type = nbr_t&; + + public: + iterator() = default; + explicit iterator(const std::vector& adj_lists, + const nbr_unit_t& nbr, const prop_id_t& default_prop_id, + size_t index) + : adj_lists_(adj_lists), + curr_nbr_(default_prop_id), + curr_list_index_(index) { + curr_nbr_ = nbr; + } + explicit iterator(const std::vector& adj_lists, + const nbr_t& nbr, size_t list_index) + : adj_lists_(adj_lists), curr_nbr_(nbr), curr_list_index_(list_index) {} + + reference_type operator*() noexcept { return curr_nbr_; } + + pointer_type operator->() noexcept { return &curr_nbr_; } + + iterator& operator++() { + if (++curr_nbr_ == adj_lists_.get()[curr_list_index_].end()) { + ++curr_list_index_; + if (curr_list_index_ < adj_lists_.get().size()) { + curr_nbr_ = adj_lists_.get()[curr_list_index_].begin(); + } + } + return *this; + } + + iterator operator++(int) { + iterator ret(adj_lists_.get(), curr_nbr_, curr_list_index_); + ++(*this); + return ret; + } + + bool operator==(const iterator& rhs) noexcept { + return curr_nbr_ == rhs.curr_nbr_; + } + + bool operator!=(const iterator& rhs) noexcept { + return curr_nbr_ != rhs.curr_nbr_; + } + + private: + std::reference_wrapper> adj_lists_; + nbr_t curr_nbr_; + size_t curr_list_index_; + }; + + iterator begin() { + if (size_ == 0) { + nbr_unit_t nbr; + return iterator(adj_lists_, nbr, default_prop_id_, 0); + } else { + return iterator(adj_lists_, adj_lists_.front().begin(), default_prop_id_, + 0); + } + } + + iterator end() { + if (size_ == 0) { + nbr_unit_t nbr; + return iterator(adj_lists_, nbr, default_prop_id_, 0); + } else { + return iterator(adj_lists_, adj_lists_.back().end(), default_prop_id_, + adj_lists_.size()); + } + } + + inline bool Empty() const { return adj_lists_.empty(); } + + inline bool NotEmpty() const { return !Empty(); } + + inline size_t Size() const { return size_; } + + private: + std::vector adj_lists_; + prop_id_t default_prop_id_; + size_t size_; +}; + +/** + * @brief Union of a set of VertexArray. UnionVertexArray is construct with + * UnionVertexRange. + * + * @tparam T + * @tparam VID_T + */ +template +class UnionVertexArray { + public: + UnionVertexArray() = default; + explicit UnionVertexArray(const UnionVertexRange& vertices) { + Init(vertices); + } + + UnionVertexArray(const UnionVertexRange& vertices, const T& value) { + Init(vertices, value); + } + + ~UnionVertexArray() = default; + + void Init(const UnionVertexRange& vertices) { + ranges_ = vertices.GetVertexRanges(); + vertex_arrays_.resize(ranges_.size()); + for (size_t i = 0; i < ranges_.size(); ++i) { + vertex_arrays_[i].Init(ranges_[i]); + } + } + + void Init(const UnionVertexRange& vertices, const T& value) { + ranges_ = vertices.GetVertexRanges(); + vertex_arrays_.resize(ranges_.size()); + for (size_t i = 0; i < ranges_.size(); ++i) { + vertex_arrays_[i].Init(ranges_[i], value); + } + } + + void SetValue(const UnionVertexRange& vertices, const T& value) { + ranges_ = vertices.GetVertexRanges(); + vertex_arrays_.resize(ranges_.size()); + for (size_t i = 0; i < ranges_.size(); ++i) { + vertex_arrays_[i].SetValue(ranges_[i], value); + } + } + + void SetValue(const T& value) { + for (auto& array : vertex_arrays_) { + array.SetValue(value); + } + } + + inline T& operator[](const grape::Vertex& loc) { + auto range_index = getRangeIndex(loc); + return vertex_arrays_[range_index][loc]; + } + + inline const T& operator[](const grape::Vertex& loc) const { + auto range_index = getRangeIndex(loc); + return vertex_arrays_[range_index][loc]; + } + + void Swap(UnionVertexArray& rhs) { + ranges_.swap(rhs.ranges_); + vertex_arrays_.swap(rhs.vertex_arrays_); + } + + void Clear() { + ranges_._clear(); + vertex_arrays_.clear(); + } + + UnionVertexRange GetVertexRange() const { + return UnionVertexRange(ranges_); + } + + private: + size_t getRangeIndex(const grape::Vertex& loc) const { + const auto& value = loc.GetValue(); + for (size_t i = 0; i < ranges_.size(); ++i) { + if (value >= ranges_[i].begin().GetValue() && + value < ranges_[i].end().GetValue()) { + return i; + } + } + return 0; + } + + std::vector> ranges_; + std::vector> vertex_arrays_; +}; + +class UnionDestList { + public: + explicit UnionDestList(const std::vector& dest_lists) { + std::set dstset; + for (auto& dsts : dest_lists) { + grape::fid_t* ptr = dsts.begin; + while (ptr != dsts.end) { + dstset.insert(*(ptr++)); + } + } + for (auto fid : dstset) { + fid_list_.push_back(fid); + } + + begin = fid_list_.data(); + end = fid_list_.data() + fid_list_.size(); + } + + grape::fid_t* begin; + grape::fid_t* end; + + private: + std::vector fid_list_; +}; +} // namespace arrow_flattened_fragment_impl + +/** + * @brief This class represents the fragment flattened from ArrowFragment. + * Different from ArrowProjectedFragment, an ArrowFlattenedFragment derives from + * an ArrowFragment, but flattens all the labels to one type, result in a graph + * with a single type of vertices and a single type of edges. Optionally, + * a common property across labels of vertices(reps., edges) in the + * ArrowFragment will be reserved as vdata(resp, edata). + * ArrowFlattenedFragment usually used as a wrapper for ArrowFragment to run the + * applications/algorithms defined in NetworkX or Analytical engine, + * since these algorithms need the topology of the whole (property) graph. + * + * @tparam OID_T + * @tparam VID_T + * @tparam VDATA_T + * @tparam EDATA_T + */ +template +class ArrowFlattenedFragment { + public: + using fragment_t = vineyard::ArrowFragment; + using oid_t = OID_T; + using vid_t = VID_T; + using eid_t = typename fragment_t::eid_t; + using vdata_t = VDATA_T; + using edata_t = EDATA_T; + using vertex_t = typename fragment_t::vertex_t; + using fid_t = grape::fid_t; + using label_id_t = typename fragment_t::label_id_t; + using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE; + using vertex_range_t = arrow_flattened_fragment_impl::UnionVertexRange; + template + using vertex_array_t = + arrow_flattened_fragment_impl::UnionVertexArray; + using adj_list_t = + arrow_flattened_fragment_impl::UnionAdjList; + using dest_list_t = arrow_flattened_fragment_impl::UnionDestList; + + // This member is used by grape::check_load_strategy_compatible() + static constexpr grape::LoadStrategy load_strategy = + grape::LoadStrategy::kBothOutIn; + + ArrowFlattenedFragment() = default; + + explicit ArrowFlattenedFragment(fragment_t* frag, prop_id_t v_prop_id, + prop_id_t e_prop_id) + : fragment_(frag), v_prop_id_(v_prop_id), e_prop_id_(e_prop_id) { + ivnum_ = ovnum_ = tvnum_ = 0; + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + ivnum_ += fragment_->GetInnerVerticesNum(v_label); + ovnum_ += fragment_->GetOuterVerticesNum(v_label); + tvnum_ += fragment_->GetVerticesNum(v_label); + } + } + + virtual ~ArrowFlattenedFragment() = default; + + static std::shared_ptr> + Project(const std::shared_ptr& frag, const std::string& v_prop, + const std::string& e_prop) { + prop_id_t v_prop_id = boost::lexical_cast(v_prop); + prop_id_t e_prop_id = boost::lexical_cast(e_prop); + return std::make_shared(frag.get(), v_prop_id, + e_prop_id); + } + + void PrepareToRunApp(grape::MessageStrategy strategy, bool need_split_edges) { + fragment_->PrepareToRunApp(strategy, need_split_edges); + } + + inline fid_t fid() const { return fragment_->fid(); } + + inline fid_t fnum() const { return fragment_->fnum(); } + + inline bool directed() const { return fragment_->directed(); } + + inline vertex_range_t Vertices() const { + std::vector> vertex_ranges; + vertex_ranges.reserve(fragment_->vertex_label_num()); + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + auto range = fragment_->Vertices(v_label); + if (range.size() != 0) { + vertex_ranges.push_back(range); + } + } + return vertex_range_t(vertex_ranges); + } + + inline vertex_range_t InnerVertices() const { + std::vector> vertex_ranges; + vertex_ranges.reserve(fragment_->vertex_label_num()); + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + auto range = fragment_->InnerVertices(v_label); + if (range.size() != 0) { + vertex_ranges.push_back(range); + } + } + return vertex_range_t(vertex_ranges); + } + + inline vertex_range_t OuterVertices() const { + std::vector> vertex_ranges; + vertex_ranges.reserve(fragment_->vertex_label_num()); + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + auto range = fragment_->OuterVertices(v_label); + if (range.size() != 0) { + vertex_ranges.push_back(range); + } + } + return vertex_range_t(vertex_ranges); + } + + inline label_id_t vertex_label(const vertex_t& v) const { + return fragment_->vertex_label(v); + } + + inline bool GetVertex(const oid_t& oid, vertex_t& v) const { + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + if (fragment_->GetVertex(v_label, oid, v)) { + return true; + } + } + return false; + } + + inline oid_t GetId(const vertex_t& v) const { return fragment_->GetId(v); } + + inline fid_t GetFragId(const vertex_t& u) const { + return fragment_->GetFragId(u); + } + + inline bool Gid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->Gid2Vertex(gid, v); + } + + inline vid_t Vertex2Gid(const vertex_t& v) const { + return fragment_->Vertex2Gid(v); + } + + inline vdata_t GetData(const vertex_t& v) const { + return fragment_->template GetData(v, v_prop_id_); + } + + inline vid_t GetInnerVerticesNum() const { return ivnum_; } + + inline vid_t GetOuterVerticesNum() const { return ovnum_; } + + inline vid_t GetVerticesNum() const { return tvnum_; } + + inline size_t GetTotalVerticesNum() const { + return fragment_->GetTotalVerticesNum(); + } + + inline size_t GetEdgeNum() const { return fragment_->GetEdgeNum(); } + + inline bool IsInnerVertex(const vertex_t& v) const { + return fragment_->IsInnerVertex(v); + } + + inline bool IsOuterVertex(const vertex_t& v) const { + return fragment_->IsOuterVertex(v); + } + + inline bool GetInnerVertex(const oid_t& oid, vertex_t& v) const { + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + if (fragment_->GetInnerVertex(v_label, oid, v)) { + return true; + } + } + return false; + } + + inline bool GetOuterVertex(const oid_t& oid, vertex_t& v) const { + for (label_id_t v_label = 0; v_label < fragment_->vertex_label_num(); + v_label++) { + if (fragment_->GetOuterVertex(v_label, oid, v)) { + return true; + } + } + return false; + } + + inline oid_t GetInnerVertexId(const vertex_t& v) const { + return fragment_->GetInnerVertexId(v); + } + + inline oid_t GetOuterVertexId(const vertex_t& v) const { + return fragment_->GetOuterVertexId(v); + } + + inline oid_t Gid2Oid(const vid_t& gid) const { + return fragment_->Gid2Oid(gid); + } + + inline bool Oid2Gid(const oid_t& oid, vid_t& gid) const { + for (label_id_t label = 0; label < fragment_->vertex_label_num(); label++) { + if (fragment_->Oid2Gid(label, oid, gid)) { + return true; + } + } + return false; + } + + inline bool InnerVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->InnerVertexGid2Vertex(gid, v); + } + + inline bool OuterVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->OuterVertexGid2Vertex(gid, v); + } + + inline vid_t GetOuterVertexGid(const vertex_t& v) const { + return fragment_->GetOuterVertexGid(v); + } + + inline vid_t GetInnerVertexGid(const vertex_t& v) const { + return fragment_->GetInnerVertexGid(v); + } + + inline adj_list_t GetOutgoingAdjList(const vertex_t& v) const { + std::vector> + adj_lists; + adj_lists.reserve(fragment_->edge_label_num()); + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + auto adj_list = fragment_->GetOutgoingAdjList(v, e_label); + if (adj_list.NotEmpty()) { + adj_lists.push_back(adj_list); + } + } + return adj_list_t(adj_lists, e_prop_id_); + } + + inline adj_list_t GetIncomingAdjList(const vertex_t& v) const { + std::vector> + adj_lists; + adj_lists.reserve(fragment_->edge_label_num()); + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + auto adj_list = fragment_->GetIncomingAdjList(v, e_label); + if (adj_list.NotEmpty()) { + adj_lists.push_back(adj_list); + } + } + return adj_list_t(adj_lists, e_prop_id_); + } + + inline int GetLocalOutDegree(const vertex_t& v) const { + int local_out_degree = 0; + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + local_out_degree += fragment_->GetLocalOutDegree(v, e_label); + } + return local_out_degree; + } + + inline int GetLocalInDegree(const vertex_t& v) const { + int local_in_degree = 0; + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + local_in_degree += fragment_->GetLocalInDegree(v, e_label); + } + return local_in_degree; + } + + inline dest_list_t IEDests(const vertex_t& v) const { + std::vector dest_lists; + dest_lists.reserve(fragment_->edge_label_num()); + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + dest_lists.push_back(fragment_->IEDests(v, e_label)); + } + return dest_list_t(dest_lists); + } + + inline dest_list_t OEDests(const vertex_t& v) const { + std::vector dest_lists; + dest_lists.reserve(fragment_->edge_label_num()); + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + dest_lists.push_back(fragment_->OEDests(v, e_label)); + } + return dest_list_t(dest_lists); + } + + inline dest_list_t IOEDests(const vertex_t& v) const { + std::vector dest_lists; + dest_lists.reserve(fragment_->edge_label_num()); + for (label_id_t e_label = 0; e_label < fragment_->edge_label_num(); + e_label++) { + dest_lists.push_back(fragment_->IOEDests(v, e_label)); + } + return dest_list_t(dest_lists); + } + + private: + fragment_t* fragment_; + prop_id_t v_prop_id_; + prop_id_t e_prop_id_; + vid_t ivnum_; + vid_t ovnum_; + vid_t tvnum_; +}; + +} // namespace gs +#endif // ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_FLATTENED_FRAGMENT_H_ diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 180b79e13f65..391126f08896 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -325,8 +325,9 @@ class AdjList { } // namespace arrow_projected_fragment_impl /** - * @brief This class represents the fragment projected from ArrowFragment. The - * fragment has no label and property. + * @brief This class represents the fragment projected from ArrowFragment which + * contains only one vertex label and edge label. The fragment has no label and + * property. * * @tparam OID_T OID type * @tparam VID_T VID type diff --git a/analytical_engine/core/fragment/dynamic_fragment.h b/analytical_engine/core/fragment/dynamic_fragment.h index bbf9c3339183..8ab89eadc60c 100644 --- a/analytical_engine/core/fragment/dynamic_fragment.h +++ b/analytical_engine/core/fragment/dynamic_fragment.h @@ -800,7 +800,7 @@ class DynamicFragment { } // generate undirected graph from original directed graph. - void ToUnDirectedFrom(std::shared_ptr origin) { + void ToUndirectedFrom(std::shared_ptr origin) { // original graph must be directed. assert(origin->directed()); diff --git a/analytical_engine/core/fragment/dynamic_projected_fragment.h b/analytical_engine/core/fragment/dynamic_projected_fragment.h index b1ef59b79e86..337460469128 100644 --- a/analytical_engine/core/fragment/dynamic_projected_fragment.h +++ b/analytical_engine/core/fragment/dynamic_projected_fragment.h @@ -502,6 +502,149 @@ class DynamicProjectedFragment { e_prop); } + void PrepareToRunApp(grape::MessageStrategy strategy, bool need_split_edges) { + fragment_->PrepareToRunApp(strategy, need_split_edges); + } + + inline fid_t fid() const { return fragment_->fid_; } + + inline fid_t fnum() const { return fragment_->fnum_; } + + inline vid_t id_mask() const { return fragment_->id_mask_; } + + inline int fid_offset() const { return fragment_->fid_offset_; } + + inline bool directed() const { return fragment_->directed(); } + + inline vertex_range_t Vertices() const { return fragment_->Vertices(); } + + inline vertex_range_t InnerVertices() const { + return fragment_->InnerVertices(); + } + + inline vertex_range_t OuterVertices() const { + return fragment_->OuterVertices(); + } + + inline bool GetVertex(const oid_t& oid, vertex_t& v) const { + return fragment_->GetVertex(oid, v); + } + + inline const vid_t* GetOuterVerticesGid() const { + return fragment_->GetOuterVerticesGid(); + } + + inline oid_t GetId(const vertex_t& v) const { return fragment_->GetId(v); } + + inline fid_t GetFragId(const vertex_t& u) const { + return fragment_->GetFragId(u); + } + + inline bool Gid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->Gid2Vertex(gid, v); + } + + inline vid_t Vertex2Gid(const vertex_t& v) const { + return fragment_->Vertex2Gid(v); + } + + inline vdata_t GetData(const vertex_t& v) const { + assert(fragment_->IsInnerVertex(v)); + auto data = fragment_->vdata()[v.GetValue()]; + return dynamic_projected_fragment_impl::unpack_dynamic( + data, v_prop_key_); + } + + inline void SetData(const vertex_t& v, const vdata_t& val) { + assert(fragment_->IsInnerVertex(v)); + dynamic_projected_fragment_impl::pack_dynamic( + fragment_->vdata()[v.GetValue()][v_prop_key_], val); + } + + inline vid_t GetInnerVerticesNum() const { + return fragment_->GetInnerVerticesNum(); + } + + inline vid_t GetOuterVerticesNum() const { + return fragment_->GetOuterVerticesNum(); + } + + inline vid_t GetVerticesNum() const { return fragment_->GetVerticesNum(); } + + size_t GetTotalVerticesNum() const { + return fragment_->GetTotalVerticesNum(); + } + + inline size_t GetEdgeNum() const { return fragment_->GetEdgeNum(); } + + inline bool IsInnerVertex(const vertex_t& v) const { + return fragment_->IsInnerVertex(v); + } + + inline bool IsOuterVertex(const vertex_t& v) const { + return fragment_->IsOuterVertex(v); + } + + inline bool GetInnerVertex(const oid_t& oid, vertex_t& v) const { + return fragment_->GetInnerVertex(oid, v); + } + + inline bool GetOuterVertex(const oid_t& oid, vertex_t& v) const { + return fragment_->GetOuterVertex(oid, v); + } + + inline oid_t GetInnerVertexId(const vertex_t& v) const { + return fragment_->GetInnerVertexId(v); + } + + inline oid_t GetOuterVertexId(const vertex_t& v) const { + return fragment_->GetOuterVertexId(v); + } + + inline oid_t Gid2Oid(const vid_t& gid) const { + return fragment_->Gid2Oid(gid); + } + + inline bool Oid2Gid(const oid_t& oid, vid_t& gid) const { + return fragment_->Oid2Gid(oid, gid); + } + + inline bool InnerVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->InnerVertexGid2Vertex(gid, v); + } + + inline bool OuterVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { + return fragment_->OuterVertexGid2Vertex(gid, v); + } + + inline vid_t GetOuterVertexGid(const vertex_t& v) const { + return fragment_->GetOuterVertexGid(v); + } + + inline vid_t GetInnerVertexGid(const vertex_t& v) const { + return fragment_->GetInnerVertexGid(v); + } + + inline bool IsAliveVertex(const vertex_t& v) const { + return fragment_->IsAliveVertex(v); + } + + inline bool IsAliveInnerVertex(const vertex_t& v) const { + return fragment_->IsAliveInnerVertex(v); + } + + inline bool IsAliveOuterVertex(const vertex_t& v) const { + return fragment_->IsAliveOuterVertex(v); + } + + inline bool HasChild(const vertex_t& v) const { + return fragment_->HasChild(v); + } + + inline bool HasParent(const vertex_t& v) const { + return fragment_->HasParent(v); + } + inline projected_adj_linked_list_t GetIncomingAdjList(const vertex_t& v) { int32_t ie_pos; if (fragment_->duplicated() && fragment_->IsOuterVertex(v)) { @@ -664,69 +807,6 @@ class DynamicProjectedFragment { fragment_->inner_edge_space().OuterNbr(oe_pos).cend()); } - inline fid_t fid() const { return fragment_->fid_; } - - inline fid_t fnum() const { return fragment_->fnum_; } - - inline vid_t id_mask() const { return fragment_->id_mask_; } - - inline int fid_offset() const { return fragment_->fid_offset_; } - - inline bool directed() const { return fragment_->directed(); } - - inline const vid_t* GetOuterVerticesGid() const { - return fragment_->GetOuterVerticesGid(); - } - - inline size_t GetEdgeNum() const { return fragment_->GetEdgeNum(); } - - inline vid_t GetVerticesNum() const { return fragment_->GetVerticesNum(); } - - size_t GetTotalVerticesNum() const { - return fragment_->GetTotalVerticesNum(); - } - - inline vertex_range_t Vertices() const { return fragment_->Vertices(); } - - inline vertex_range_t InnerVertices() const { - return fragment_->InnerVertices(); - } - - inline vertex_range_t OuterVertices() const { - return fragment_->OuterVertices(); - } - - inline bool GetVertex(const oid_t& oid, vertex_t& v) const { - return fragment_->GetVertex(oid, v); - } - - inline oid_t GetId(const vertex_t& v) const { return fragment_->GetId(v); } - - inline fid_t GetFragId(const vertex_t& u) const { - return fragment_->GetFragId(u); - } - - inline vdata_t GetData(const vertex_t& v) const { - assert(fragment_->IsInnerVertex(v)); - auto data = fragment_->vdata()[v.GetValue()]; - return dynamic_projected_fragment_impl::unpack_dynamic( - data, v_prop_key_); - } - - inline void SetData(const vertex_t& v, const vdata_t& val) { - assert(fragment_->IsInnerVertex(v)); - dynamic_projected_fragment_impl::pack_dynamic( - fragment_->vdata()[v.GetValue()][v_prop_key_], val); - } - - inline bool HasChild(const vertex_t& v) const { - return fragment_->HasChild(v); - } - - inline bool HasParent(const vertex_t& v) const { - return fragment_->HasParent(v); - } - inline int GetLocalOutDegree(const vertex_t& v) const { return fragment_->GetLocalOutDegree(v); } @@ -735,82 +815,6 @@ class DynamicProjectedFragment { return fragment_->GetLocalInDegree(v); } - inline bool Gid2Vertex(const vid_t& gid, vertex_t& v) const { - return fragment_->Gid2Vertex(gid, v); - } - - inline vid_t Vertex2Gid(const vertex_t& v) const { - return fragment_->Vertex2Gid(v); - } - - inline vid_t GetInnerVerticesNum() const { - return fragment_->GetInnerVerticesNum(); - } - - inline vid_t GetOuterVerticesNum() const { - return fragment_->GetOuterVerticesNum(); - } - - inline bool IsInnerVertex(const vertex_t& v) const { - return fragment_->IsInnerVertex(v); - } - - inline bool IsOuterVertex(const vertex_t& v) const { - return fragment_->IsOuterVertex(v); - } - - inline bool GetInnerVertex(const oid_t& oid, vertex_t& v) const { - return fragment_->GetInnerVertex(oid, v); - } - - inline bool GetOuterVertex(const oid_t& oid, vertex_t& v) const { - return fragment_->GetOuterVertex(oid, v); - } - - inline oid_t GetInnerVertexId(const vertex_t& v) const { - return fragment_->GetInnerVertexId(v); - } - - inline oid_t GetOuterVertexId(const vertex_t& v) const { - return fragment_->GetOuterVertexId(v); - } - - inline oid_t Gid2Oid(const vid_t& gid) const { - return fragment_->Gid2Oid(gid); - } - - inline bool Oid2Gid(const oid_t& oid, vid_t& gid) const { - return fragment_->Oid2Gid(oid, gid); - } - - inline bool InnerVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { - return fragment_->InnerVertexGid2Vertex(gid, v); - } - - inline bool OuterVertexGid2Vertex(const vid_t& gid, vertex_t& v) const { - return fragment_->OuterVertexGid2Vertex(gid, v); - } - - inline vid_t GetOuterVertexGid(const vertex_t& v) const { - return fragment_->GetOuterVertexGid(v); - } - - inline vid_t GetInnerVertexGid(const vertex_t& v) const { - return fragment_->GetInnerVertexGid(v); - } - - inline bool IsAliveVertex(const vertex_t& v) const { - return fragment_->IsAliveVertex(v); - } - - inline bool IsAliveInnerVertex(const vertex_t& v) const { - return fragment_->IsAliveInnerVertex(v); - } - - inline bool IsAliveOuterVertex(const vertex_t& v) const { - return fragment_->IsAliveOuterVertex(v); - } - inline grape::DestList IEDests(const vertex_t& v) const { return fragment_->IEDests(v); } @@ -827,10 +831,6 @@ class DynamicProjectedFragment { return fragment_->MirrorVertices(fid); } - void PrepareToRunApp(grape::MessageStrategy strategy, bool need_split_edges) { - fragment_->PrepareToRunApp(strategy, need_split_edges); - } - bl::result GetOidType( const grape::CommSpec& comm_spec) const { return fragment_->GetOidType(comm_spec); diff --git a/analytical_engine/core/grape_instance.cc b/analytical_engine/core/grape_instance.cc index 0522b38c6baf..6d7ac212030e 100644 --- a/analytical_engine/core/grape_instance.cc +++ b/analytical_engine/core/grape_instance.cc @@ -670,7 +670,7 @@ bl::result GrapeInstance::toUnDirected( std::string dst_graph_name = "graph_" + generateId(); BOOST_LEAF_AUTO(dst_wrapper, - src_wrapper->ToUnDirected(comm_spec_, dst_graph_name)); + src_wrapper->ToUndirected(comm_spec_, dst_graph_name)); BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); return dst_wrapper->graph_def(); #else @@ -888,7 +888,8 @@ bl::result GrapeInstance::registerGraphType(const rpc::GSParams& params) { BOOST_LEAF_CHECK(utils->Init()); return object_manager_.PutObject(utils); } else if (graph_type == rpc::graph::ARROW_PROJECTED || - graph_type == rpc::graph::DYNAMIC_PROJECTED) { + graph_type == rpc::graph::DYNAMIC_PROJECTED || + graph_type == rpc::graph::ARROW_FLATTENED) { auto projector = std::make_shared(type_sig, lib_path); BOOST_LEAF_CHECK(projector->Init()); return object_manager_.PutObject(projector); diff --git a/analytical_engine/core/object/fragment_wrapper.h b/analytical_engine/core/object/fragment_wrapper.h index e63302497ad4..1cb4a1af19c6 100644 --- a/analytical_engine/core/object/fragment_wrapper.h +++ b/analytical_engine/core/object/fragment_wrapper.h @@ -30,6 +30,7 @@ #include "core/context/vertex_data_context.h" #include "core/context/vertex_property_context.h" #include "core/error.h" +#include "core/fragment/arrow_flattened_fragment.h" #include "core/fragment/dynamic_fragment_view.h" #include "core/fragment/dynamic_projected_fragment.h" #include "core/fragment/fragment_reporter.h" @@ -582,7 +583,7 @@ class FragmentWrapper> "Cannot convert to the directed ArrowFragment"); } - bl::result> ToUnDirected( + bl::result> ToUndirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) override { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, @@ -648,7 +649,7 @@ class FragmentWrapper> "Cannot convert to the directed DynamicProjectedFragment"); } - bl::result> ToUnDirected( + bl::result> ToUndirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) override { RETURN_GS_ERROR( @@ -778,7 +779,7 @@ class FragmentWrapper : public IFragmentWrapper { return std::dynamic_pointer_cast(wrapper); } - bl::result> ToUnDirected( + bl::result> ToUndirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) override { // copy vertex map @@ -807,7 +808,7 @@ class FragmentWrapper : public IFragmentWrapper { // copy fragment auto dst_frag = std::make_shared(new_vm_ptr); - dst_frag->ToUnDirectedFrom(fragment_); + dst_frag->ToUndirectedFrom(fragment_); auto dst_graph_def = graph_def_; dst_graph_def.set_key(dst_graph_name); @@ -881,7 +882,7 @@ class FragmentWrapper> "Cannot convert to the directed DynamicProjectedFragment"); } - bl::result> ToUnDirected( + bl::result> ToUndirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) override { RETURN_GS_ERROR( @@ -900,6 +901,71 @@ class FragmentWrapper> rpc::graph::GraphDefPb graph_def_; std::shared_ptr fragment_; }; + +/** + * @brief A specialized FragmentWrapper for ArrowFlattenedFragment. + */ +template +class FragmentWrapper> + : public IFragmentWrapper { + using fragment_t = ArrowFlattenedFragment; + + public: + FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def, + std::shared_ptr fragment) + : IFragmentWrapper(id), + graph_def_(std::move(graph_def)), + fragment_(std::move(fragment)) { + CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_FLATTENED); + } + + std::shared_ptr fragment() const override { + return std::static_pointer_cast(fragment_); + } + + const rpc::graph::GraphDefPb& graph_def() const override { + return graph_def_; + } + + bl::result ReportGraph(const grape::CommSpec& comm_spec, + const rpc::GSParams& params) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Not implemented."); + } + + bl::result> CopyGraph( + const grape::CommSpec& comm_spec, const std::string& dst_graph_name, + const std::string& copy_type) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot copy the ArrowFlattenedFragment"); + } + + bl::result> ToDirected( + const grape::CommSpec& comm_spec, + const std::string& dst_graph_name) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot convert to the directed ArrowFlattenedFragment"); + } + + bl::result> ToUndirected( + const grape::CommSpec& comm_spec, + const std::string& dst_graph_name) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot convert to the undirected ArrowFlattenedFragment"); + } + + bl::result> CreateGraphView( + const grape::CommSpec& comm_spec, const std::string& dst_graph_name, + const std::string& copy_type) override { + RETURN_GS_ERROR( + vineyard::ErrorCode::kInvalidOperationError, + "Cannot generate a graph view over the ArrowFlattenedFragment."); + } + + private: + rpc::graph::GraphDefPb graph_def_; + std::shared_ptr fragment_; +}; #endif } // namespace gs diff --git a/analytical_engine/core/object/i_fragment_wrapper.h b/analytical_engine/core/object/i_fragment_wrapper.h index f2065236cf38..4655bf52f8b1 100644 --- a/analytical_engine/core/object/i_fragment_wrapper.h +++ b/analytical_engine/core/object/i_fragment_wrapper.h @@ -55,7 +55,7 @@ class IFragmentWrapper : public GSObject { virtual bl::result> ToDirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) = 0; - virtual bl::result> ToUnDirected( + virtual bl::result> ToUndirected( const grape::CommSpec& comm_spec, const std::string& dst_graph_name) = 0; virtual bl::result> CreateGraphView( diff --git a/analytical_engine/frame/project_frame.cc b/analytical_engine/frame/project_frame.cc index 74024f8e9759..6d357da0cd68 100644 --- a/analytical_engine/frame/project_frame.cc +++ b/analytical_engine/frame/project_frame.cc @@ -174,6 +174,54 @@ class ProjectSimpleFrame> { return std::dynamic_pointer_cast(wrapper); } }; + +template +class ProjectSimpleFrame< + gs::ArrowFlattenedFragment> { + using fragment_t = vineyard::ArrowFragment; + using projected_fragment_t = + gs::ArrowFlattenedFragment; + + public: + static bl::result> Project( + std::shared_ptr& input_wrapper, + const std::string& projected_graph_name, const rpc::GSParams& params) { + auto graph_type = input_wrapper->graph_def().graph_type(); + if (graph_type != rpc::graph::ARROW_PROPERTY) { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "graph_type should be ARROW_PROPERTY, got " + + rpc::graph::GraphTypePb_Name(graph_type)); + } + + BOOST_LEAF_AUTO(v_prop_key, params.Get(rpc::V_PROP_KEY)); + BOOST_LEAF_AUTO(e_prop_key, params.Get(rpc::E_PROP_KEY)); + auto input_frag = + std::static_pointer_cast(input_wrapper->fragment()); + auto projected_frag = + projected_fragment_t::Project(input_frag, v_prop_key, e_prop_key); + + rpc::graph::GraphDefPb graph_def; + + graph_def.set_key(projected_graph_name); + graph_def.set_graph_type(rpc::graph::ARROW_FLATTENED); + gs::rpc::graph::VineyardInfoPb vy_info; + if (graph_def.has_extension()) { + graph_def.extension().UnpackTo(&vy_info); + } + vy_info.set_oid_type(PropertyTypeToPb(vineyard::normalize_datatype( + vineyard::TypeName::Get()))); + vy_info.set_vid_type(PropertyTypeToPb(vineyard::normalize_datatype( + vineyard::TypeName::Get()))); + vy_info.set_vdata_type(PropertyTypeToPb(vineyard::normalize_datatype( + vineyard::TypeName::Get()))); + vy_info.set_edata_type(PropertyTypeToPb(vineyard::normalize_datatype( + vineyard::TypeName::Get()))); + graph_def.mutable_extension()->PackFrom(vy_info); + auto wrapper = std::make_shared>( + projected_graph_name, graph_def, projected_frag); + return std::dynamic_pointer_cast(wrapper); + } +}; #endif } // namespace gs diff --git a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml index c135a042b9dd..982f3698ab6d 100644 --- a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml +++ b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml @@ -87,6 +87,7 @@ app: src: apps/centrality/degree/degree_centrality.h compatible_graph: - gs::DynamicFragment + - gs::ArrowFlattenedFragment - algo: eigenvector_centrality type: cpp_pie class_name: gs::EigenvectorCentrality @@ -253,4 +254,4 @@ app: class_name: gs::BetweennessCentralityGeneric src: apps/centrality/betweenness/betweenness_centrality_generic.h compatible_graph: - - gs::DynamicProjectedFragment + - gs::DynamicProjectedFragment diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index d74886963564..1010621656df 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -322,9 +322,10 @@ def compile_graph_frame(workspace: str, library_name, attr: dict, engine_config: ] if graph_type == graph_def_pb2.ARROW_PROPERTY: cmake_commands += ["-DPROPERTY_GRAPH_FRAME=True"] - elif ( - graph_type == graph_def_pb2.ARROW_PROJECTED - or graph_type == graph_def_pb2.DYNAMIC_PROJECTED + elif graph_type in ( + graph_def_pb2.ARROW_PROJECTED, + graph_def_pb2.DYNAMIC_PROJECTED, + graph_def_pb2.ARROW_FLATTENED, ): cmake_commands += ["-DPROJECT_FRAME=True"] else: @@ -725,7 +726,10 @@ def _pre_process_for_output_graph_op(op, op_result_pool, key_to_op, **kwargs): def _pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs): # for nx graph - if op.attr[types_pb2.GRAPH_TYPE].graph_type == graph_def_pb2.DYNAMIC_PROJECTED: + if op.attr[types_pb2.GRAPH_TYPE].graph_type in ( + graph_def_pb2.DYNAMIC_PROJECTED, + graph_def_pb2.ARROW_FLATTENED, + ): return assert len(op.parents) == 1 # get parent graph schema @@ -1115,6 +1119,10 @@ def _codegen_app_info(attr, meta_file: str): "gs::DynamicFragment", "core/fragment/dynamic_fragment.h", ), + graph_def_pb2.ARROW_FLATTENED: ( + "gs::ArrowFlattenedFragment", + "core/fragment/arrow_flattened_fragment.h", + ), } @@ -1142,6 +1150,14 @@ def _codegen_graph_info(attr): attr[types_pb2.V_DATA_TYPE].s.decode("utf-8"), attr[types_pb2.E_DATA_TYPE].s.decode("utf-8"), ) + elif graph_class == "gs::ArrowFlattenedFragment": + graph_fqn = "{}<{},{},{},{}>".format( + graph_class, + attr[types_pb2.OID_TYPE].s.decode("utf-8"), + attr[types_pb2.VID_TYPE].s.decode("utf-8"), + attr[types_pb2.V_DATA_TYPE].s.decode("utf-8"), + attr[types_pb2.E_DATA_TYPE].s.decode("utf-8"), + ) else: # gs::DynamicProjectedFragment graph_fqn = "{}<{},{}>".format( diff --git a/proto/graph_def.proto b/proto/graph_def.proto index 8476c820bac8..3afef6810c80 100644 --- a/proto/graph_def.proto +++ b/proto/graph_def.proto @@ -29,6 +29,7 @@ enum GraphTypePb { ARROW_PROPERTY = 4; ARROW_PROJECTED = 5; PERSISTENT_STORE = 6; + ARROW_FLATTENED = 7; } message MaxGraphInfoPb { @@ -42,7 +43,7 @@ message VineyardInfoPb { DataTypePb vid_type = 2; DataTypePb vdata_type = 3; DataTypePb edata_type = 4; - + string schema_path = 5; bool generate_eid = 6; int64 vineyard_id = 7; @@ -57,7 +58,7 @@ message GraphDefPb { repeated TypeDefPb type_defs = 5; repeated EdgeKindPb edge_kinds = 6; map property_name_to_id = 7; - // current extension supported: + // current extension supported: // - MaxGraphInfoPb // - VineyardInfoPb google.protobuf.Any extension = 8; diff --git a/python/graphscope/analytical/app/__init__.py b/python/graphscope/analytical/app/__init__.py index 6c1ab6f79807..6c4b0822afa7 100644 --- a/python/graphscope/analytical/app/__init__.py +++ b/python/graphscope/analytical/app/__init__.py @@ -24,6 +24,8 @@ numeric_assortativity_coefficient from graphscope.analytical.app.average_degree_connectivity import \ average_degree_connectivity +from graphscope.analytical.app.average_shortest_path_length import \ + average_shortest_path_length # fmt: on from graphscope.analytical.app.bfs import bfs diff --git a/python/graphscope/analytical/app/average_shortest_path_length.py b/python/graphscope/analytical/app/average_shortest_path_length.py new file mode 100644 index 000000000000..487a57adc9c4 --- /dev/null +++ b/python/graphscope/analytical/app/average_shortest_path_length.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + + +from graphscope.framework.app import AppAssets +from graphscope.framework.app import not_compatible_for +from graphscope.framework.app import project_to_simple + +__all__ = ["average_shortest_path_length"] + + +@project_to_simple +@not_compatible_for("arrow_property", "dynamic_property", "arrow_flattened") +def average_shortest_path_length(G): + r"""Returns the average shortest path length. + + The average shortest path length is + + .. math:: + + a =\sum_{s,t \in V} \frac{d(s, t)}{n(n-1)} + + where `V` is the set of nodes in `G`, + `d(s, t)` is the shortest path from `s` to `t`, + and `n` is the number of nodes in `G`. + + Parameters + ---------- + G : graph + + """ + ctx = AppAssets(algo="sssp_average_length", context="tensor")(G) + return ctx.to_numpy("r", axis=0)[0] diff --git a/python/graphscope/analytical/app/k_core.py b/python/graphscope/analytical/app/k_core.py index a7a01935192e..4c922cafc90c 100644 --- a/python/graphscope/analytical/app/k_core.py +++ b/python/graphscope/analytical/app/k_core.py @@ -25,7 +25,7 @@ @project_to_simple -@not_compatible_for("arrow_property", "dynamic_property") +@not_compatible_for("arrow_property", "dynamic_property", "arrow_flattened") def k_core(graph, k: int): """K-cores of the graph are connected components that are left after all vertices of degree less than `k` have been removed. diff --git a/python/graphscope/analytical/app/triangles.py b/python/graphscope/analytical/app/triangles.py index 0e2c52e8f0f9..54b15620f1ea 100644 --- a/python/graphscope/analytical/app/triangles.py +++ b/python/graphscope/analytical/app/triangles.py @@ -25,7 +25,7 @@ @project_to_simple -@not_compatible_for("arrow_property", "dynamic_property") +@not_compatible_for("arrow_property", "dynamic_property", "arrow_flattened") def triangles(graph): """Evaluate triangle counting of the graph G. diff --git a/python/graphscope/framework/app.py b/python/graphscope/framework/app.py index 68f446a4e275..25246c89aceb 100644 --- a/python/graphscope/framework/app.py +++ b/python/graphscope/framework/app.py @@ -94,6 +94,7 @@ def wrapper(*args, **kwargs): "arrow_projected": graph.graph_type == graph_def_pb2.ARROW_PROJECTED, "dynamic_projected": graph.graph_type == graph_def_pb2.DYNAMIC_PROJECTED, + "arrow_flattened": graph.graph_type == graph_def_pb2.ARROW_FLATTENED, } match = False try: @@ -101,7 +102,7 @@ def wrapper(*args, **kwargs): match = match or terms[t] except KeyError: raise InvalidArgumentError( - "Use one or more of arrow_property,dynamic_property,arrow_projected,dynamic_projected", + "Use one or more of arrow_property,dynamic_property,arrow_projected,dynamic_projected,arrow_flattened", ) if match: raise InvalidArgumentError( diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index 73aa9af42aac..6b9bae9c0cc8 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -502,6 +502,48 @@ def project_dynamic_property_graph(graph, v_prop, e_prop, v_prop_type, e_prop_ty return op +def flatten_arrow_property_graph( + graph, v_prop, e_prop, v_prop_type, e_prop_type, oid_type=None, vid_type=None +): + """Flatten arrow property graph. + + Args: + graph (:class:`nx.Graph`): A nx graph hosts an arrow property graph. + v_prop (str): The vertex property id. + e_prop (str): The edge property id. + v_prop_type (str): Type of the node attribute. + e_prop_type (str): Type of the edge attribute. + oid_type (str): Type of oid. + vid_type (str): Type of vid. + + Returns: + Operation to flatten an arrow property graph. Results in a arrow flattened graph. + """ + config = { + types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key), + types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph_def_pb2.ARROW_FLATTENED), + types_pb2.DST_GRAPH_TYPE: utils.graph_type_to_attr(graph.graph_type), + types_pb2.V_DATA_TYPE: utils.s_to_attr(utils.data_type_to_cpp(v_prop_type)), + types_pb2.E_DATA_TYPE: utils.s_to_attr(utils.data_type_to_cpp(e_prop_type)), + } + if graph.graph_type == graph_def_pb2.ARROW_PROPERTY: + config[types_pb2.V_PROP_KEY] = utils.s_to_attr(str(v_prop)) + config[types_pb2.E_PROP_KEY] = utils.s_to_attr(str(e_prop)) + config[types_pb2.OID_TYPE] = utils.s_to_attr(utils.data_type_to_cpp(oid_type)) + config[types_pb2.VID_TYPE] = utils.s_to_attr(utils.data_type_to_cpp(vid_type)) + else: + config[types_pb2.V_PROP_KEY] = utils.s_to_attr(v_prop) + config[types_pb2.E_PROP_KEY] = utils.s_to_attr(e_prop) + + op = Operation( + graph.session_id, + types_pb2.PROJECT_TO_SIMPLE, + config=config, + output_types=types_pb2.GRAPH, + ) + return op + + def copy_graph(graph, copy_type="identical"): """Create copy operation for nx graph. diff --git a/python/graphscope/nx/algorithms/builtin.py b/python/graphscope/nx/algorithms/builtin.py index d2fc01e549b1..2c11ad124232 100644 --- a/python/graphscope/nx/algorithms/builtin.py +++ b/python/graphscope/nx/algorithms/builtin.py @@ -26,6 +26,7 @@ import graphscope from graphscope import nx from graphscope.framework.app import AppAssets +from graphscope.framework.app import not_compatible_for from graphscope.framework.errors import InvalidArgumentError from graphscope.nx.utils.compat import patch_docstring from graphscope.proto import graph_def_pb2 @@ -39,7 +40,10 @@ def wrapper(*args, **kwargs): graph = args[0] if not hasattr(graph, "graph_type"): raise InvalidArgumentError("Missing graph_type attribute in graph object.") - elif graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY: + elif graph.graph_type in ( + graph_def_pb2.DYNAMIC_PROPERTY, + graph_def_pb2.ARROW_PROPERTY, + ): if ( "weight" in inspect.getfullargspec(func)[0] ): # func has 'weight' argument @@ -109,7 +113,11 @@ def pagerank(G, alpha=0.85, max_iter=100, tol=1.0e-6): """ ctx = graphscope.pagerank_nx(G, alpha, max_iter, tol) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -158,7 +166,8 @@ def hits(G, max_iter=100, tol=1.0e-8, normalized=True): http://www.cs.cornell.edu/home/kleinber/auth.pdf. """ ctx = graphscope.hits(G, tolerance=tol, max_round=max_iter, normalized=normalized) - return ctx.to_dataframe({"node": "v.id", "auth": "r.auth", "hub": "r.hub"}) + df = ctx.to_dataframe({"id": "v.id", "auth": "r.auth", "hub": "r.hub"}) + return (df.set_index("id")["hub"].to_dict(), df.set_index("id")["auth"].to_dict()) @project_to_simple @@ -188,7 +197,11 @@ def degree_centrality(G): possible degree in a simple graph n-1 where n is the number of nodes in G. """ ctx = graphscope.degree_centrality(G, centrality_type="both") - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @not_implemented_for("undirected") @@ -224,7 +237,11 @@ def in_degree_centrality(G): possible degree in a simple graph n-1 where n is the number of nodes in G. """ ctx = graphscope.degree_centrality(G, centrality_type="in") - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @not_implemented_for("undirected") @@ -260,7 +277,11 @@ def out_degree_centrality(G): possible degree in a simple graph n-1 where n is the number of nodes in G. """ ctx = graphscope.degree_centrality(G, centrality_type="out") - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -311,7 +332,11 @@ def eigenvector_centrality(G, max_iter=100, tol=1e-06, weight=None): hits """ ctx = graphscope.eigenvector_centrality(G, tolerance=tol, max_round=max_iter) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -400,7 +425,11 @@ def katz_centrality( max_round=max_iter, normalized=normalized, ) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -417,7 +446,8 @@ def has_path(G, source, target): target : node Ending node for path """ - return AppAssets(algo="sssp_has_path", context="tensor")(G, source, target) + ctx = AppAssets(algo="sssp_has_path", context="tensor")(G, source, target) + return ctx.to_numpy("r", axis=0)[0] @project_to_simple @@ -461,8 +491,12 @@ def single_source_dijkstra_path_length(G, source, weight=None): Distances are calculated as sums of weighted edges traversed. """ - ctx = graphscope.sssp(G, source) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + ctx = AppAssets(algo="sssp_projected", context="vertex_data")(G, source) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -495,8 +529,7 @@ def average_shortest_path_length(G, weight=None): 2.0 """ - ctx = AppAssets(algo="sssp_average_length", context="tensor")(G) - return ctx.to_numpy("r", axis=0)[0] + return graphscope.average_shortest_path_length(G) @project_to_simple @@ -655,7 +688,11 @@ def closeness_centrality(G, weight=None, wf_improved=True): Cambridge University Press. """ ctx = AppAssets(algo="closeness_centrality", context="vertex_data")(G, wf_improved) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @patch_docstring(nxa.bfs_tree) @@ -795,7 +832,11 @@ def clustering(G): # FIXME(weibin): clustering now only correct in directed graph. # FIXME: nodes and weight not support. ctx = graphscope.clustering(G) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -822,7 +863,11 @@ def triangles(G, nodes=None): """ # FIXME: nodes not support. ctx = graphscope.triangles(G) - return ctx.to_dataframe({"node": "v.id", "result": "r"}) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) @project_to_simple @@ -897,7 +942,12 @@ def weakly_connected_components(G): 1 if the vertex satisfies k-core, otherwise 0. """ - return AppAssets(algo="wcc_projected", context="vertex_data")(G) + ctx = AppAssets(algo="wcc_projected", context="vertex_data")(G) + return ( + ctx.to_dataframe({"id": "v.id", "component": "r"}) + .set_index("id")["component"] + .to_dict() + ) @project_to_simple @@ -1472,4 +1522,8 @@ def betweenness_centrality( ctx = AppAssets(algo=algorithm, context="vertex_data")( G, normalized=normalized, endpoints=endpoints ) - return dict(zip(ctx.to_numpy("v.id"), ctx.to_numpy("r"))) + return ( + ctx.to_dataframe({"id": "v.id", "value": "r"}) + .set_index("id")["value"] + .to_dict() + ) diff --git a/python/graphscope/nx/classes/graph.py b/python/graphscope/nx/classes/graph.py index 1ed299c73934..c6b1addbd5f2 100644 --- a/python/graphscope/nx/classes/graph.py +++ b/python/graphscope/nx/classes/graph.py @@ -397,6 +397,14 @@ def template_str(self): ) vid_type = self._schema.vid_type return f"vineyard::ArrowFragment<{oid_type},{vid_type}>" + elif self._graph_type == graph_def_pb2.ARROW_FLATTENED: + oid_type = utils.normalize_data_type_str( + utils.data_type_to_cpp(self._schema.oid_type) + ) + vid_type = self._schema.vid_type + vdata_type = utils.data_type_to_cpp(self._schema.vdata_type) + edata_type = utils.data_type_to_cpp(self._schema.edata_type) + return f"gs::ArrowFlattenedFragment<{oid_type},{vid_type},{vdata_type},{edata_type}>" else: raise ValueError(f"Unsupported graph type: {self._graph_type}") @@ -2037,9 +2045,9 @@ def _batch_get_degree( return op.eval() def _project_to_simple(self, v_prop=None, e_prop=None): - """Project nx graph to a simple graph to run builtin alogorithms. + """Project nx graph to a simple graph to run builtin algorithms. - A simple graph is a accesser wrapper of property graph that only single edge + A simple graph is a wrapper of property graph that only single edge attribute and single node attribute are available. Parameters @@ -2065,6 +2073,7 @@ def _project_to_simple(self, v_prop=None, e_prop=None): if v_prop is None: v_prop = str(v_prop) + v_prop_id = -1 v_prop_type = graph_def_pb2.NULLVALUE else: check_argument(isinstance(v_prop, str)) @@ -2081,6 +2090,7 @@ def _project_to_simple(self, v_prop=None, e_prop=None): if e_prop is None: e_prop = str(e_prop) + e_prop_id = -1 e_prop_type = graph_def_pb2.NULLVALUE else: check_argument(isinstance(e_prop, str)) @@ -2092,13 +2102,25 @@ def _project_to_simple(self, v_prop=None, e_prop=None): raise InvalidArgumentError( "graph not contains the edge property {}".format(e_prop) ) - op = dag_utils.project_dynamic_property_graph( - self, v_prop, e_prop, v_prop_type, e_prop_type - ) - graph_def = op.eval(leaf=False) graph = self.__class__(create_empty_in_engine=False) graph = nx.freeze(graph) - graph._graph_type = graph_def_pb2.DYNAMIC_PROJECTED + if self.graph_type == graph_def_pb2.DYNAMIC_PROPERTY: + op = dag_utils.project_dynamic_property_graph( + self, v_prop, e_prop, v_prop_type, e_prop_type + ) + graph._graph_type = graph_def_pb2.DYNAMIC_PROJECTED + else: + op = dag_utils.flatten_arrow_property_graph( + self, + v_prop_id, + e_prop_id, + v_prop_type, + e_prop_type, + self.schema.oid_type, + self.schema.vid_type, + ) + graph._graph_type = graph_def_pb2.ARROW_FLATTENED + graph_def = op.eval(leaf=False) graph._key = graph_def.key graph._session = self._session graph.schema.from_graph_def(graph_def) diff --git a/python/graphscope/nx/tests/algorithms/builtin/test_shortest_paths.py b/python/graphscope/nx/tests/algorithms/builtin/test_shortest_paths.py index c8cd391c3d65..6f1764a62cfd 100644 --- a/python/graphscope/nx/tests/algorithms/builtin/test_shortest_paths.py +++ b/python/graphscope/nx/tests/algorithms/builtin/test_shortest_paths.py @@ -29,13 +29,13 @@ def test_run_average_shortest_path_length(self): nx.builtin.average_shortest_path_length(self.G, weight="weight") def test_run_has_path(self): - nx.builtin.has_path(self.G, source=0, target=3) + assert nx.builtin.has_path(self.G, source=0, target=3) def test_shortest_path_length_on_reverse_view(self): ret1 = nx.builtin.single_source_dijkstra_path_length( self.DG, source=2, weight="weight" ) - assert replace_with_inf(dict(ret1.values)) == { + assert replace_with_inf(ret1) == { 0.0: float("inf"), 1.0: float("inf"), 2.0: 0.0, @@ -46,7 +46,7 @@ def test_shortest_path_length_on_reverse_view(self): ret2 = nx.builtin.single_source_dijkstra_path_length( RDG, source=2, weight="weight" ) - assert replace_with_inf(dict(ret2.values)) == { + assert replace_with_inf(ret2) == { 0.0: 1.0, 1.0: 1.0, 2.0: 0.0, @@ -58,12 +58,12 @@ def test_shortest_path_length_on_directed_view(self): ret1 = nx.builtin.single_source_dijkstra_path_length( self.G, source=2, weight="weight" ) - assert dict(ret1.values) == {0.0: 1.0, 1.0: 1.0, 2.0: 0.0, 3.0: 1.0, 4.0: 2.0} + assert ret1 == {0.0: 1.0, 1.0: 1.0, 2.0: 0.0, 3.0: 1.0, 4.0: 2.0} DG = self.G.to_directed(as_view=True) ret2 = nx.builtin.single_source_dijkstra_path_length( DG, source=2, weight="weight" ) - assert dict(ret2.values) == {0.0: 1.0, 1.0: 1.0, 2.0: 0.0, 3.0: 1.0, 4.0: 2.0} + assert ret2 == {0.0: 1.0, 1.0: 1.0, 2.0: 0.0, 3.0: 1.0, 4.0: 2.0} def test_all_pairs_shortest_path_length(self): cycle = nx.cycle_graph(7) diff --git a/python/graphscope/nx/tests/classes/test_graph.py b/python/graphscope/nx/tests/classes/test_graph.py index 5ba5bdb3a26d..51f525d15341 100644 --- a/python/graphscope/nx/tests/classes/test_graph.py +++ b/python/graphscope/nx/tests/classes/test_graph.py @@ -1,4 +1,3 @@ -# # This file is referred and derived from project NetworkX # # which has the following license: @@ -19,6 +18,7 @@ import pytest from networkx.classes.tests.test_graph import TestEdgeSubgraph as _TestEdgeSubgraph from networkx.classes.tests.test_graph import TestGraph as _TestGraph +from networkx.testing.utils import almost_equal from networkx.testing.utils import assert_graphs_equal from graphscope import nx @@ -243,132 +243,102 @@ def test_update(self): def test_duplicated_modification(self): G = nx.complete_graph(5, create_using=self.Graph) - ret_frame = nx.builtin.closeness_centrality(G) - assert np.allclose( - ret_frame.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(G) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} # test add node G.add_node(5) - ret_frame2 = nx.builtin.closeness_centrality(G) - assert np.allclose( - ret_frame2.sort_values(by=["node"]).to_numpy(), - [[0.0, 0.8], [1.0, 0.8], [2.0, 0.8], [3.0, 0.8], [4.0, 0.8], [5.0, 0.0]], - ) + ret = nx.builtin.closeness_centrality(G) + assert ret == {0: 0.8, 1: 0.8, 2: 0.8, 3: 0.8, 4: 0.8, 5: 0.0} + # test add edge G.add_edge(4, 5) - ret_frame3 = nx.builtin.closeness_centrality(G) - expect1 = [ - [0.0, 0.8], - [1.0, 0.8], - [2.0, 0.8], - [3.0, 0.8], - [4.0, 0.8], - [5.0, 0.555556], - ] - expect2 = [ - [0.0, 0.833333], - [1.0, 0.833333], - [2.0, 0.833333], - [3.0, 0.833333], - [4.0, 1.0], - [5.0, 0.555556], - ] + ret = nx.builtin.closeness_centrality(G) + expect1 = { + 0: 0.8, + 1: 0.8, + 2: 0.8, + 3: 0.8, + 4: 0.8, + 5: 0.555556, + } + expect2 = { + 0: 0.833333, + 1: 0.833333, + 2: 0.833333, + 3: 0.833333, + 4: 1.0, + 5: 0.555556, + } if G.is_directed(): - assert np.allclose( - ret_frame3.sort_values(by=["node"]).to_numpy(), - expect1, - ) + for n in ret: + assert almost_equal(ret[n], expect1[n], places=4) else: - assert np.allclose( - ret_frame3.sort_values(by=["node"]).to_numpy(), - expect2, - ) + for n in ret: + assert almost_equal(ret[n], expect2[n], places=4) + # test remove edge G.remove_edge(4, 5) - ret_frame4 = nx.builtin.closeness_centrality(G) - assert np.allclose( - ret_frame4.sort_values(by=["node"]).to_numpy(), - [[0.0, 0.8], [1.0, 0.8], [2.0, 0.8], [3.0, 0.8], [4.0, 0.8], [5.0, 0.0]], - ) + ret = nx.builtin.closeness_centrality(G) + assert ret == {0: 0.8, 1: 0.8, 2: 0.8, 3: 0.8, 4: 0.8, 5: 0.0} + # test remove node G.remove_node(5) - ret_frame5 = nx.builtin.closeness_centrality(G) - assert np.allclose( - ret_frame5.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(G) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} + # test update for e in G.edges: G.edges[e]["weight"] = 2 - ret_frame6 = nx.builtin.closeness_centrality(G, weight="weight") - assert np.allclose( - ret_frame6.sort_values(by=["node"]).to_numpy(), - [[0.0, 0.5], [1.0, 0.5], [2.0, 0.5], [3.0, 0.5], [4.0, 0.5]], - ) + ret = nx.builtin.closeness_centrality(G, weight="weight") + assert ret == {0: 0.5, 1: 0.5, 2: 0.5, 3: 0.5, 4: 0.5} + # test copy G2 = G.copy() - ret_frame7 = nx.builtin.closeness_centrality(G2) - assert np.allclose( - ret_frame7.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(G2) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} + # test reverse if G.is_directed(): rG = G.reverse() - ret_frame8 = nx.builtin.closeness_centrality(rG) - assert np.allclose( - ret_frame8.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(rG) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} + # to_directed/to_undirected if G.is_directed(): udG = G.to_undirected() - ret_frame9 = nx.builtin.closeness_centrality(udG) - assert np.allclose( - ret_frame9.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(udG) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} else: dG = G.to_directed() - ret_frame10 = nx.builtin.closeness_centrality(dG) - assert np.allclose( - ret_frame10.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000], [3.0, 1.000], [4.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(dG) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0} + # sub_graph sG = G.subgraph([0, 1, 2]) - ret_frame11 = nx.builtin.closeness_centrality(sG) - assert np.allclose( - ret_frame11.sort_values(by=["node"]).to_numpy(), - [[0.0, 1.000], [1.0, 1.000], [2.0, 1.000]], - ) + ret = nx.builtin.closeness_centrality(sG) + assert ret == {0: 1.0, 1: 1.0, 2: 1.0} esG = G.edge_subgraph([(0, 1), (1, 2), (2, 3)]) - ret_frame12 = nx.builtin.closeness_centrality(esG) - expect1 = [ - [0.0, 0.000], - [1.0, 0.333333], - [2.0, 0.444444], - [3.0, 0.500], - ] - expect2 = [ - [0.0, 0.5], - [1.0, 0.75], - [2.0, 0.75], - [3.0, 0.5], - ] + ret = nx.builtin.closeness_centrality(esG) + expect1 = { + 0: 0.000, + 1: 0.333333, + 2: 0.444444, + 3: 0.500, + } + expect2 = { + 0: 0.5, + 1: 0.75, + 2: 0.75, + 3: 0.5, + } if G.is_directed(): - assert np.allclose( - ret_frame12.sort_values(by=["node"]).to_numpy(), - expect1, - ) + for n in ret: + assert almost_equal(ret[n], expect1[n], places=4) else: - assert np.allclose( - ret_frame12.sort_values(by=["node"]).to_numpy(), - expect2, - ) + for n in ret: + assert almost_equal(ret[n], expect2[n], places=4) @pytest.mark.usefixtures("graphscope_session") diff --git a/python/graphscope/nx/tests/conftest.py b/python/graphscope/nx/tests/conftest.py index 6f3710677b3b..fe827f7fbe70 100644 --- a/python/graphscope/nx/tests/conftest.py +++ b/python/graphscope/nx/tests/conftest.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os import pytest @@ -28,7 +27,6 @@ def graphscope_session(): graphscope.set_option(initializing_interactive_engine=False) sess = graphscope.session(cluster_type="hosts") - sess.as_default() yield sess sess.close() diff --git a/python/graphscope/nx/tests/test_copy_on_write.py b/python/graphscope/nx/tests/test_copy_on_write.py new file mode 100644 index 000000000000..86dd1584c037 --- /dev/null +++ b/python/graphscope/nx/tests/test_copy_on_write.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +import pandas as pd +import pytest +from networkx.testing.utils import assert_graphs_equal + +import graphscope +import graphscope.nx as nx +from graphscope.framework.errors import InvalidArgumentError +from graphscope.framework.loader import Loader +from graphscope.nx.tests.classes.test_digraph import TestDiGraph as _TestDiGraph +from graphscope.nx.tests.classes.test_graph import TestGraph as _TestGraph +from graphscope.nx.tests.utils import almost_equal +from graphscope.nx.tests.utils import replace_with_inf + + +def k3_graph(prefix, directed): + graph = graphscope.g(directed=directed, generate_eid=False) + graph = graph.add_vertices( + Loader(os.path.join(prefix, "3v.csv"), delimiter="|"), "vertex" + ) + if directed: + graph = graph.add_edges( + Loader(os.path.join(prefix, "k3_directed.csv"), delimiter="|"), + "edge", + ) + else: + graph = graph.add_edges( + Loader(os.path.join(prefix, "k3_undirected.csv"), delimiter="|"), + "edge", + ) + return graph + + +def p3_graph(prefix, directed): + graph = graphscope.g(directed=directed, generate_eid=False) + graph = graph.add_vertices( + Loader(os.path.join(prefix, "3v.csv"), delimiter="|"), "vertex" + ) + graph = graph.add_edges( + Loader(os.path.join(prefix, "p3_directed.csv"), delimiter="|"), + "edge", + ) + return graph + + +def simple_label_graph(prefix, directed): + graph = graphscope.g(directed=directed, generate_eid=False) + graph = graph.add_vertices(Loader(os.path.join(prefix, "simple_v_0.csv")), "v-0") + graph = graph.add_vertices(Loader(os.path.join(prefix, "simple_v_1.csv")), "v-1") + graph = graph.add_edges( + Loader(os.path.join(prefix, "simple_e_0.csv")), + "e-0", + src_label="v-0", + dst_label="v-0", + ) + graph = graph.add_edges( + Loader(os.path.join(prefix, "simple_e_1.csv")), + "e-1", + src_label="v-0", + dst_label="v-1", + ) + graph = graph.add_edges( + Loader(os.path.join(prefix, "simple_e_2.csv")), + "e-2", + src_label="v-1", + dst_label="v-1", + ) + return graph + + +def p2p_31_graph(prefix, directed): + graph = graphscope.g(directed=directed, generate_eid=False) + graph = graph.add_vertices( + Loader(os.path.join(prefix, "p2p-31.v"), delimiter=" ", header_row=False), + "vertex", + ) + graph = graph.add_edges( + Loader(os.path.join(prefix, "p2p-31.e"), delimiter=" ", header_row=False), + "edge", + ) + return graph + + +@pytest.mark.usefixtures("graphscope_session") +class TestGraphCopyOnWrite(_TestGraph): + def setup_method(self): + self.Graph = nx.Graph + self.k3nodes = [0, 1, 2] + self.k3edges = [(0, 1), (0, 2), (1, 2)] + data_dir = os.path.expandvars("${GS_TEST_DIR}/networkx") + self.k3 = k3_graph(data_dir, False) + self.K3 = nx.Graph(self.k3, default_label="vertex") + + def test_update(self): + # specify both edgees and nodes + G = self.K3.copy() + G.update(nodes=[3, (4, {"size": 2})], edges=[(4, 5), (6, 7, {"weight": 2})]) + nlist = [ + (0, {}), + (1, {}), + (2, {}), + (3, {}), + (4, {"size": 2}), + (5, {}), + (6, {}), + (7, {}), + ] + assert sorted(G.nodes.data()) == nlist + if G.is_directed(): + elist = [ + (0, 1, {}), + (0, 2, {}), + (1, 0, {}), + (1, 2, {}), + (2, 0, {}), + (2, 1, {}), + (4, 5, {}), + (6, 7, {"weight": 2}), + ] + else: + elist = [ + (0, 1, {}), + (2, 0, {}), # N.B: diff with _TestGraph, update the order of id + (2, 1, {}), + (4, 5, {}), + (6, 7, {"weight": 2}), + ] + assert sorted(G.edges.data()) == elist + assert G.graph == {} + + # no keywords -- order is edges, nodes + G = self.K3.copy() + G.update([(4, 5), (6, 7, {"weight": 2})], [3, (4, {"size": 2})]) + assert sorted(G.nodes.data()) == nlist + assert sorted(G.edges.data()) == elist + assert G.graph == {} + + # update using only a graph + G = self.Graph() + G.graph["foo"] = "bar" + G.add_node(2, data=4) + G.add_edge(0, 1, weight=0.5) + GG = G.copy() + H = self.Graph() + GG.update(H) + assert_graphs_equal(G, GG) + H.update(G) + assert_graphs_equal(H, G) + + # update nodes only + H = self.Graph() + H.update(nodes=[3, 4]) + assert H.nodes ^ {3, 4} == set() + assert H.size() == 0 + + # update edges only + H = self.Graph() + H.update(edges=[(3, 4)]) + if H.is_directed(): + assert sorted(H.edges.data()) == [(3, 4, {})] + else: + assert sorted(H.edges.data()) == [(4, 3, {})] + assert H.size() == 1 + + # No inputs -> exception + with pytest.raises(nx.NetworkXError): + nx.Graph().update() + + +@pytest.mark.usefixtures("graphscope_session") +class TestDiGraphCopyOnWrite(_TestDiGraph): + def setup_method(self): + data_dir = os.path.expandvars("${GS_TEST_DIR}/networkx") + self.Graph = nx.DiGraph + # build K3 + self.k3edges = [(0, 1), (0, 2), (1, 2)] + self.k3nodes = [0, 1, 2] + self.k3 = k3_graph(data_dir, True) + self.K3 = nx.DiGraph(self.k3, default_label="vertex") + + self.p3 = p3_graph(data_dir, True) + self.P3 = nx.DiGraph(self.p3, default_label="vertex") + + +@pytest.mark.usefixtures("graphscope_session") +class TestBuiltinCopyOnWrite: + def setup_method(self): + data_dir = os.path.expandvars("${GS_TEST_DIR}/networkx") + p2p_dir = os.path.expandvars("${GS_TEST_DIR}") + + self.simple = simple_label_graph(data_dir, True) + self.SG = nx.DiGraph(self.simple, default_label="v-0") + self.SG.pagerank = { + 1: 0.03721197, + 2: 0.05395735, + 3: 0.04150565, + 4: 0.37508082, + 5: 0.20599833, + 6: 0.28624589, + } + self.SG.auth = { + 1: 0.165000, + 2: 0.243018, + 3: 0.078017, + 4: 0.078017, + 5: 0.270943, + 6: 0.165000, + } + self.SG.hub = { + 1: 0.182720, + 2: 0.0, + 3: 0.386437, + 4: 0.248121, + 5: 0.138316, + 6: 0.044404, + } + self.SG.eigen = { + 1: 3.201908045277076e-06, + 2: 6.4038160905537886e-06, + 3: 3.201908045277076e-06, + 5: 0.40044823300165794, + 4: 0.6479356498234745, + 6: 0.6479356498234745, + } + self.SG.katz = { + 1: 0.37871516522035104, + 2: 0.4165866814015425, + 3: 0.37871516522035104, + 5: 0.42126739520601203, + 4: 0.4255225997990211, + 6: 0.4255225997990211, + } + + # FIXME(acezen): p2p_31_graph loading fail in ci, open when fixed the problem. (fixme) + # self.p2p_31 = p2p_31_graph(p2p_dir, False) + # self.P2P = nx.Graph(self.p2p_31, default_label="vertex") + # self.P2P.sssp = dict( + # pd.read_csv( + # "{}/p2p-31-sssp".format(os.path.expandvars("${GS_TEST_DIR}")), + # sep=" ", + # header=None, + # prefix="", + # ).values + # ) + + def test_single_source_dijkstra_path_length(self): + ret = nx.builtin.single_source_dijkstra_path_length( + self.SG, source=1, weight="weight" + ) + assert ret == {1: 0.0, 2: 1.0, 3: 1.0, 4: 3.0, 5: 2.0, 6: 3.0} + # p2p_ans = nx.builtin.single_source_dijkstra_path_length( + # self.P2P, source=6, weight="f2" + # ) + # assert replace_with_inf(p2p_ans) == self.P2P.sssp + + def test_wcc(self): + ret = nx.builtin.weakly_connected_components(self.SG) + assert ret == {1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0} + + def test_pagerank(self): + p = nx.builtin.pagerank(self.SG, alpha=0.9, tol=1.0e-08) + for n in p: + assert almost_equal(p[n], self.SG.pagerank[n], places=4) + + def test_hits(self): + h, a = nx.builtin.hits(self.SG, tol=1.0e-08) + for n in h: + assert almost_equal(h[n], self.SG.hub[n], places=4) + assert almost_equal(a[n], self.SG.auth[n], places=4) + + def test_degree_centrality(self): + ret = nx.builtin.degree_centrality(self.SG) + assert ret == { + 1: 0.6, + 2: 0.4, + 3: 0.8, + 5: 0.8, + 4: 0.8, + 6: 0.6, + } + + def test_eigenvector_centrality(self): + ret = nx.builtin.eigenvector_centrality(self.SG) + for n in ret: + assert almost_equal(ret[n], self.SG.eigen[n], places=12) + + def test_katz_centrality(self): + ret = nx.builtin.katz_centrality(self.SG) + for n in ret: + assert almost_equal(ret[n], self.SG.katz[n], places=12) + + def test_has_path(self): + assert nx.builtin.has_path(self.SG, source=1, target=6) + + def test_average_shortest_path_length(self): + # average_shortest_path_length implementation contain grape::VertexDenseSet which + # can not use with ArrowFlattenedFragment + with pytest.raises(InvalidArgumentError): + nx.builtin.average_shortest_path_length(self.SG) + + def test_bfs_edges(self): + ret = nx.builtin.bfs_edges(self.SG, 1, depth_limit=10) + assert sorted(ret) == [[1, 3], [3, 5], [4, 6], [5, 4]] + + def bfs_tree(self): + ret = nx.builtin.bfs_tree(self.SG, 1, depth_limit=10) + assert sorted(ret) == [1, 2, 3, 4, 5, 6] + + def test_k_core(self): + # k_core implementation contain grape::VertexDenseSet which + # can not use with ArrowFlattenedFragment + with pytest.raises(InvalidArgumentError): + nx.builtin.k_core(self.SG, k=1) + + def test_clustering(self): + ret = nx.builtin.clustering(self.SG) + assert ret == {1: 0.5, 2: 1.0, 3: 0.2, 5: 0.4, 4: 0.5, 6: 1.0} + + def test_triangles(self): + # triangles implementation contain grape::VertexDenseSet which + # can not use with ArrowFlattenedFragment + with pytest.raises(InvalidArgumentError): + nx.builtin.triangles(self.SG) + + def test_average_clustering(self): + ret = nx.builtin.average_clustering(self.SG) + assert almost_equal(ret, 0.6, places=4) + + def test_degree_assortativity_coefficient(self): + ret = nx.builtin.degree_assortativity_coefficient(self.SG) + assert almost_equal(ret, -0.25000000000000033, places=12) + + def test_node_boundary(self): + ret = nx.builtin.node_boundary(self.SG, [1, 2]) + assert ret == [3] + + def test_edge_boundary(self): + ret = nx.builtin.edge_boundary(self.SG, [1, 2]) + assert ret == [[2, 3], [1, 3]] + + def test_attribute_assortativity_coefficient(self): + ret = nx.builtin.attribute_assortativity_coefficient(self.SG, attribute="attr") + assert almost_equal(ret, -0.17647058823529418, places=12) + + def test_numeric_assortativity_coefficient(self): + ret = nx.builtin.numeric_assortativity_coefficient(self.SG, attribute="attr") + assert almost_equal(ret, 0.5383819020581653, places=12) diff --git a/python/graphscope/nx/tests/test_ctx_builtin.py b/python/graphscope/nx/tests/test_ctx_builtin.py index c8527ef9d27f..88455a8335e7 100644 --- a/python/graphscope/nx/tests/test_ctx_builtin.py +++ b/python/graphscope/nx/tests/test_ctx_builtin.py @@ -1,4 +1,5 @@ import os +from posixpath import expanduser import numpy as np import pandas as pd @@ -154,14 +155,12 @@ def test_single_source_dijkstra_path_length(self): ret = nx.builtin.single_source_dijkstra_path_length( self.grid, 1, weight="weight" ) - ans = dict(ret.astype(np.int64).values) - assert ans == self.grid_ans + assert ret == self.grid_ans ret = nx.builtin.single_source_dijkstra_path_length( self.p2p_undirected, 6, weight="weight" ) - ans = dict(ret.values) - assert replace_with_inf(ans) == self.p2p_length_ans + assert replace_with_inf(ret) == self.p2p_length_ans def test_subgraph_single_source_dijkstra_path_length(self): # test subgraph and edge_subgraph with p2p_subgraph_undirected @@ -170,10 +169,10 @@ def test_subgraph_single_source_dijkstra_path_length(self): ) SG = self.p2p_undirected.subgraph(self.p2p_subgraph_undirected.nodes) ret_sg = nx.builtin.single_source_dijkstra_path_length(SG, 6, weight="weight") - assert dict(ret.values) == dict(ret_sg.values) + assert ret == ret_sg ESG = self.p2p_undirected.edge_subgraph(self.p2p_subgraph_undirected.edges) ret_esg = nx.builtin.single_source_dijkstra_path_length(ESG, 6, weight="weight") - assert dict(ret.values) == dict(ret_esg.values) + assert ret == ret_esg # test subgraph and edge_subgraph with p2p directed ret2 = nx.builtin.single_source_dijkstra_path_length( @@ -181,12 +180,12 @@ def test_subgraph_single_source_dijkstra_path_length(self): ) SDG = self.p2p.subgraph(self.p2p_subgraph.nodes) ret_sdg = nx.builtin.single_source_dijkstra_path_length(SDG, 6, weight="weight") - assert dict(ret2.values) == dict(ret_sdg.values) + assert ret2 == ret_sdg ESDG = self.p2p.edge_subgraph(self.p2p_subgraph.edges) ret_esdg = nx.builtin.single_source_dijkstra_path_length( ESDG, 6, weight="weight" ) - assert dict(ret2.values) == dict(ret_esdg.values) + assert ret2 == ret_esdg def test_shortest_path(self): ctx1 = nx.builtin.shortest_path(self.grid, source=1, weight="weight") @@ -194,23 +193,20 @@ def test_shortest_path(self): assert ret1 == self.grid_path_ans def test_has_path(self): - ctx = nx.builtin.has_path(self.grid, source=1, target=6) - assert ctx.to_numpy("r", axis=0)[0] - ctx = nx.builtin.has_path(self.p2p, source=6, target=3728) - assert not ctx.to_numpy("r", axis=0)[0] - ctx = nx.builtin.has_path(self.p2p, source=6, target=3723) - assert ctx.to_numpy("r", axis=0)[0] + assert nx.builtin.has_path(self.grid, source=1, target=6) + assert not nx.builtin.has_path(self.p2p, source=6, target=3728) + assert nx.builtin.has_path(self.p2p, source=6, target=3723) def test_average_shortest_path_length(self): ret = nx.builtin.average_shortest_path_length(self.grid, weight="weight") assert ret == 2.6666666666666665 def test_degree_centrality(self): - ans = dict(nx.builtin.degree_centrality(self.p2p).values) + ans = nx.builtin.degree_centrality(self.p2p) self.assert_result_almost_equal(ans, self.p2p_dc_ans) def test_eigenvector_centrality(self): - ans = dict(nx.builtin.eigenvector_centrality(self.p2p, weight="weight").values) + ans = nx.builtin.eigenvector_centrality(self.p2p, weight="weight") self.assert_result_almost_equal(ans, self.p2p_ev_ans) @pytest.mark.skip( @@ -221,20 +217,28 @@ def test_katz_centrality(self): self.assert_result_almost_equal(ans, self.p2p_kz_ans) def test_hits(self): - expected_hub = self.p2p_hits_ans[1].to_numpy(dtype=float) - expected_auth = self.p2p_hits_ans[2].to_numpy(dtype=float) - df = nx.builtin.hits(self.p2p, tol=0.001).sort_values(by=["node"]) - auth = df["auth"].to_numpy(dtype=float) - hub = df["hub"].to_numpy(dtype=float) - np.allclose(auth, expected_auth) - np.allclose(hub, expected_hub) + expected_hub = dict( + zip( + self.p2p_hits_ans[0].to_numpy(dtype=int), + self.p2p_hits_ans[1].to_numpy(dtype=float), + ) + ) + expected_auth = dict( + zip( + self.p2p_hits_ans[0].to_numpy(dtype=int), + self.p2p_hits_ans[2].to_numpy(dtype=float), + ) + ) + hub, auth = nx.builtin.hits(self.p2p, tol=0.001) + self.assert_result_almost_equal(hub, expected_hub) + self.assert_result_almost_equal(auth, expected_auth) def test_clustering(self): - ans = dict(nx.builtin.clustering(self.p2p).values) + ans = nx.builtin.clustering(self.p2p) self.assert_result_almost_equal(ans, self.p2p_clus_ans) def test_triangles(self): - ans = dict(nx.builtin.triangles(self.p2p_undirected).values) + ans = nx.builtin.triangles(self.p2p_undirected) self.assert_result_almost_equal(ans, self.p2p_triangles_ans) def test_average_clustering(self): @@ -244,7 +248,7 @@ def test_weakly_connected_components(self): ret = nx.builtin.weakly_connected_components(self.p2p_undirected) def test_pagerank(self): - ans = dict(nx.builtin.pagerank(self.p2p).values) + ans = nx.builtin.pagerank(self.p2p) self.assert_result_almost_equal(ans, self.p2p_pagerank_ans) def test_degree_assortativity_coefficient(self): diff --git a/python/graphscope/nx/tests/test_nx.py b/python/graphscope/nx/tests/test_transformation.py similarity index 85% rename from python/graphscope/nx/tests/test_nx.py rename to python/graphscope/nx/tests/test_transformation.py index 20cfa4f9f4c8..877a24cca2d7 100644 --- a/python/graphscope/nx/tests/test_nx.py +++ b/python/graphscope/nx/tests/test_transformation.py @@ -27,43 +27,9 @@ import graphscope.nx as nx from graphscope.client.session import g from graphscope.client.session import get_default_session -from graphscope.framework.errors import AnalyticalEngineInternalError from graphscope.framework.errors import InvalidArgumentError from graphscope.framework.loader import Loader -from graphscope.nx.tests.classes.test_digraph import TestDiGraph as _TestDiGraph -from graphscope.nx.tests.classes.test_graph import TestGraph as _TestGraph from graphscope.proto import graph_def_pb2 -from graphscope.proto.types_pb2 import SRC_LABEL - - -def k3_graph(prefix, directed): - graph = graphscope.g(directed=directed, generate_eid=False) - graph = graph.add_vertices( - Loader(os.path.join(prefix, "3v.csv"), delimiter="|"), "vertex" - ) - if directed: - graph = graph.add_edges( - Loader(os.path.join(prefix, "k3_directed.csv"), delimiter="|"), - "edge", - ) - else: - graph = graph.add_edges( - Loader(os.path.join(prefix, "k3_undirected.csv"), delimiter="|"), - "edge", - ) - return graph - - -def p3_graph(prefix, directed): - graph = graphscope.g(directed=directed, generate_eid=False) - graph = graph.add_vertices( - Loader(os.path.join(prefix, "3v.csv"), delimiter="|"), "vertex" - ) - graph = graph.add_edges( - Loader(os.path.join(prefix, "p3_directed.csv"), delimiter="|"), - "edge", - ) - return graph def ldbc_sample_single_label(prefix, directed): @@ -474,7 +440,7 @@ def test_gs_to_nx_with_sssp(self): ret2 = nx.builtin.single_source_dijkstra_path_length( self.p2p_nx, 6, weight="weight" ) - assert dict(ret.values) == dict(ret2.values) + assert ret == ret2 def test_error_on_wrong_nx_type(self): g = self.single_label_g @@ -712,104 +678,3 @@ def test_error_import_with_wrong_session(self): ): nx = self.session_lazy.nx() self.session_lazy.close() - - -@pytest.mark.usefixtures("graphscope_session") -class TestGraphCopyOnWrite(_TestGraph): - def setup_method(self): - self.Graph = nx.Graph - self.k3nodes = [0, 1, 2] - self.k3edges = [(0, 1), (0, 2), (1, 2)] - data_dir = os.path.expandvars("${GS_TEST_DIR}/networkx") - self.k3 = k3_graph(data_dir, False) - self.K3 = nx.Graph(self.k3, default_label="vertex") - - def test_update(self): - # specify both edgees and nodes - G = self.K3.copy() - G.update(nodes=[3, (4, {"size": 2})], edges=[(4, 5), (6, 7, {"weight": 2})]) - nlist = [ - (0, {}), - (1, {}), - (2, {}), - (3, {}), - (4, {"size": 2}), - (5, {}), - (6, {}), - (7, {}), - ] - assert sorted(G.nodes.data()) == nlist - if G.is_directed(): - elist = [ - (0, 1, {}), - (0, 2, {}), - (1, 0, {}), - (1, 2, {}), - (2, 0, {}), - (2, 1, {}), - (4, 5, {}), - (6, 7, {"weight": 2}), - ] - else: - elist = [ - (0, 1, {}), - (2, 0, {}), # N.B: diff with _TestGraph, update the order of id - (2, 1, {}), - (4, 5, {}), - (6, 7, {"weight": 2}), - ] - assert sorted(G.edges.data()) == elist - assert G.graph == {} - - # no keywords -- order is edges, nodes - G = self.K3.copy() - G.update([(4, 5), (6, 7, {"weight": 2})], [3, (4, {"size": 2})]) - assert sorted(G.nodes.data()) == nlist - assert sorted(G.edges.data()) == elist - assert G.graph == {} - - # update using only a graph - G = self.Graph() - G.graph["foo"] = "bar" - G.add_node(2, data=4) - G.add_edge(0, 1, weight=0.5) - GG = G.copy() - H = self.Graph() - GG.update(H) - assert_graphs_equal(G, GG) - H.update(G) - assert_graphs_equal(H, G) - - # update nodes only - H = self.Graph() - H.update(nodes=[3, 4]) - assert H.nodes ^ {3, 4} == set() - assert H.size() == 0 - - # update edges only - H = self.Graph() - H.update(edges=[(3, 4)]) - if H.is_directed(): - assert sorted(H.edges.data()) == [(3, 4, {})] - else: - assert sorted(H.edges.data()) == [(4, 3, {})] - assert H.size() == 1 - - # No inputs -> exception - with pytest.raises(nx.NetworkXError): - nx.Graph().update() - - -@pytest.mark.usefixtures("graphscope_session") -class TestDiGraphCopyOnWrite(_TestDiGraph): - def setup_method(self): - self.Graph = nx.DiGraph - # build K3 - self.k3edges = [(0, 1), (0, 2), (1, 2)] - self.k3nodes = [0, 1, 2] - data_dir = os.path.expandvars("${GS_TEST_DIR}/networkx") - self.k3 = k3_graph(data_dir, True) - self.K3 = nx.DiGraph(self.k3, default_label="vertex") - - self.p3 = p3_graph(data_dir, True) - self.P3 = nx.DiGraph(self.p3, default_label="vertex")