Skip to content

Commit

Permalink
feat(analytical): Add the Graph.consolidate_columns() interface in Py…
Browse files Browse the repository at this point in the history
…thon client (#3060)

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Jul 27, 2023
1 parent c585137 commit f590fd1
Show file tree
Hide file tree
Showing 15 changed files with 329 additions and 6 deletions.
34 changes: 34 additions & 0 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,35 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::addLabelsToGraph(
return dst_wrapper->graph_def();
}

bl::result<rpc::graph::GraphDefPb> GrapeInstance::consolidateColumns(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(label,
params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_LABEL));
BOOST_LEAF_AUTO(columns,
params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_COLUMNS));
BOOST_LEAF_AUTO(result_column, params.Get<std::string>(
rpc::CONSOLIDATE_COLUMNS_RESULT_COLUMN));
BOOST_LEAF_AUTO(
src_wrapper,
object_manager_.GetObject<ILabeledFragmentWrapper>(src_graph_name));
if (src_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"ConsolidateColumns is only avaiable for ArrowFragment");
}
std::string dst_graph_name = "graph_" + generateId();

VLOG(1) << "Consolidate columns from " << src_graph_name
<< ", graph name: " << dst_graph_name << ":"
<< "\nlabel = " << label << "\ncolumns = " << columns
<< "\nresult_column = " << result_column;
BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ConsolidateColumns(
comm_spec_, dst_graph_name, label, columns,
result_column));
BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper));
return dst_wrapper->graph_def();
}

bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::graphToNumpy(
const rpc::GSParams& params) {
std::pair<std::string, std::string> range;
Expand Down Expand Up @@ -1413,6 +1442,11 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
r->set_graph_def(graph_def);
break;
}
case rpc::CONSOLIDATE_COLUMNS: {
BOOST_LEAF_AUTO(graph_def, consolidateColumns(params));
r->set_graph_def(graph_def);
break;
}
case rpc::CONTEXT_TO_NUMPY: {
BOOST_LEAF_AUTO(arc, contextToNumpy(params));
r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true);
Expand Down
4 changes: 4 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class GrapeInstance : public Subscriber {

bl::result<rpc::graph::GraphDefPb> addLabelsToGraph(
const rpc::GSParams& params);

bl::result<rpc::graph::GraphDefPb> consolidateColumns(
const rpc::GSParams& params);

bl::result<std::string> getContextData(const rpc::GSParams& params);

bl::result<std::shared_ptr<grape::InArchive>> graphToNumpy(
Expand Down
62 changes: 62 additions & 0 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class FragmentWrapper<
using fragment_t =
vineyard::ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>;
using label_id_t = typename fragment_t::label_id_t;
using prop_id_t = typename fragment_t::prop_id_t;

public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
Expand Down Expand Up @@ -357,6 +358,67 @@ class FragmentWrapper<
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> ConsolidateColumns(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& label, const std::string& columns,
const std::string& result_column) override {
auto& schema = fragment_->schema();

label_id_t vertex_label_id = schema.GetVertexLabelId(label);
label_id_t edge_label_id = schema.GetEdgeLabelId(label);

std::vector<std::string> column_names;
boost::split(column_names, columns, boost::is_any_of(",;"));

if (vertex_label_id == -1 && edge_label_id == -1) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid vertex or edge label: " + label);
}

auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
vineyard::ObjectID new_frag_id = vineyard::InvalidObjectID();
if (vertex_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateVertexColumns(
*client, vertex_label_id, column_names,
result_column));
} else if (edge_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateEdgeColumns(
*client, edge_label_id, column_names,
result_column));
}

VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);

rpc::graph::GraphDefPb new_graph_def;

new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());

gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);

set_graph_def(new_frag, new_graph_def);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down
6 changes: 6 additions & 0 deletions analytical_engine/core/object/i_fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ class ILabeledFragmentWrapper : public IFragmentWrapper {
const std::map<int, std::vector<int>>& vertices,
const std::map<int, std::vector<int>>& edges) = 0;

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>>
ConsolidateColumns(const grape::CommSpec& comm_spec,
const std::string& dst_graph_name,
const std::string& label, const std::string& columns,
const std::string& result_column) = 0;

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down
1 change: 1 addition & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
graph_name, graph_def, frag);
return std::dynamic_pointer_cast<gs::IFragmentWrapper>(wrapper);
}

} // namespace detail

/**
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DAGManager(object):
types_pb2.CREATE_GRAPH, # spawn an io stream to read/write data from/to vineyard
types_pb2.BIND_APP, # need loaded graph to compile
types_pb2.ADD_LABELS, # need loaded graph
types_pb2.CONSOLIDATE_COLUMNS, # need loaded graph to transform selector
types_pb2.RUN_APP, # need loaded app
types_pb2.CONTEXT_TO_NUMPY, # need loaded graph to transform selector
types_pb2.CONTEXT_TO_DATAFRAME, # need loaded graph to transform selector
Expand Down
8 changes: 7 additions & 1 deletion coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies):
)

# Handle op that depends on loader (data source)
if op.op == types_pb2.CREATE_GRAPH or op.op == types_pb2.ADD_LABELS:
if op.op in [
types_pb2.CREATE_GRAPH,
types_pb2.CONSOLIDATE_COLUMNS,
types_pb2.ADD_LABELS,
]:
for key_of_parent_op in op.parents:
parent_op = self._key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.DATA_SOURCE:
Expand All @@ -153,6 +157,7 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies):
)
or op.op == types_pb2.TRANSFORM_GRAPH
or op.op == types_pb2.PROJECT_TO_SIMPLE
or op.op == types_pb2.CONSOLIDATE_COLUMNS
or op.op == types_pb2.ADD_LABELS
or op.op == types_pb2.ARCHIVE_GRAPH
):
Expand Down Expand Up @@ -187,6 +192,7 @@ def post_process(self, response_head, response_bodies):
if op.op in (
types_pb2.CREATE_GRAPH,
types_pb2.PROJECT_GRAPH,
types_pb2.CONSOLIDATE_COLUMNS,
types_pb2.PROJECT_TO_SIMPLE,
types_pb2.TRANSFORM_GRAPH,
types_pb2.ADD_LABELS,
Expand Down
13 changes: 13 additions & 0 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
_pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_GRAPH:
_pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.CONSOLIDATE_COLUMNS:
_pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_TO_SIMPLE:
_pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ADD_COLUMN:
Expand Down Expand Up @@ -1261,6 +1263,17 @@ def _get_all_e_props_id(schema, label):
del op.attr[types_pb2.EDGE_COLLECTIONS]


def _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
# get parent graph schema
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
graph_name = r.graph_def.key
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)


def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Graph object

.. autoclass:: GraphDAGNode
:special-members: __init__
:members: add_vertices, add_edges, add_column, project, unload
:members: add_vertices, add_edges, add_column, project, unload, consolidate_columns

.. autoclass:: Graph
:special-members: __init__
Expand Down
4 changes: 4 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ enum OperationType {
ARCHIVE_GRAPH = 24; // archive graph
SERIALIZE_GRAPH = 25; // serialize graph
DESERIALIZE_GRAPH = 26; // desrialize graph
CONSOLIDATE_COLUMNS = 27; // consolidate property columns in the graph

SUBGRAPH = 32; // subgraph in interactive query

Expand Down Expand Up @@ -182,6 +183,9 @@ enum ParamKey {
VERTEX_MAP_TYPE = 45;
COMPACT_EDGES = 46;
USE_PERFECT_HASH = 47;
CONSOLIDATE_COLUMNS_LABEL = 48;
CONSOLIDATE_COLUMNS_COLUMNS = 49;
CONSOLIDATE_COLUMNS_RESULT_COLUMN = 50;

// project
VERTEX_COLLECTIONS = 51;
Expand Down
53 changes: 53 additions & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import json
import pickle
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

from graphscope.framework import utils
from graphscope.framework.errors import check_argument
Expand Down Expand Up @@ -224,6 +228,55 @@ def add_labels_to_graph(graph, loader_op):
return op


def consolidate_columns(
graph,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
):
"""Consolidate property columns in the graph.
Args:
graph (:class:`Graph`)
label (str): The label of the vertex/edge to be consolidated.
columns: The columns to be consolidated.
result_column: The column name of the result.
Returns:
Operation
"""
check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY)
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
types_pb2.RETAIN_OID: utils.b_to_attr(graph._retain_oid),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(False),
types_pb2.CONSOLIDATE_COLUMNS_LABEL: utils.s_to_attr(label),
types_pb2.CONSOLIDATE_COLUMNS_COLUMNS: utils.s_to_attr(",".join(columns)),
types_pb2.CONSOLIDATE_COLUMNS_RESULT_COLUMN: utils.s_to_attr(result_column),
}

# The key maybe filled later in coordinator
if hasattr(graph, "key"):
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)

op = Operation(
graph.session_id,
types_pb2.CONSOLIDATE_COLUMNS,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op


def dynamic_to_arrow(graph):
"""Create an op to transform a :class:`nx.Graph` object to :class:`Graph`.
Expand Down
Loading

0 comments on commit f590fd1

Please sign in to comment.