Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(analytical): Add the Graph.consolidate_columns() interface in Python client #3060

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,35 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::addLabelsToGraph(
return dst_wrapper->graph_def();
}

bl::result<rpc::graph::GraphDefPb> GrapeInstance::consolidateColumns(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(src_graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(label,
params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_LABEL));
BOOST_LEAF_AUTO(columns,
params.Get<std::string>(rpc::CONSOLIDATE_COLUMNS_COLUMNS));
BOOST_LEAF_AUTO(result_column, params.Get<std::string>(
rpc::CONSOLIDATE_COLUMNS_RESULT_COLUMN));
BOOST_LEAF_AUTO(
src_wrapper,
object_manager_.GetObject<ILabeledFragmentWrapper>(src_graph_name));
if (src_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"ConsolidateColumns is only avaiable for ArrowFragment");
}
std::string dst_graph_name = "graph_" + generateId();

VLOG(1) << "Consolidate columns from " << src_graph_name
<< ", graph name: " << dst_graph_name << ":"
<< "\nlabel = " << label << "\ncolumns = " << columns
<< "\nresult_column = " << result_column;
BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ConsolidateColumns(
comm_spec_, dst_graph_name, label, columns,
result_column));
BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper));
return dst_wrapper->graph_def();
}

bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::graphToNumpy(
const rpc::GSParams& params) {
std::pair<std::string, std::string> range;
Expand Down Expand Up @@ -1413,6 +1442,11 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
r->set_graph_def(graph_def);
break;
}
case rpc::CONSOLIDATE_COLUMNS: {
BOOST_LEAF_AUTO(graph_def, consolidateColumns(params));
r->set_graph_def(graph_def);
break;
}
case rpc::CONTEXT_TO_NUMPY: {
BOOST_LEAF_AUTO(arc, contextToNumpy(params));
r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true);
Expand Down
4 changes: 4 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class GrapeInstance : public Subscriber {

bl::result<rpc::graph::GraphDefPb> addLabelsToGraph(
const rpc::GSParams& params);

bl::result<rpc::graph::GraphDefPb> consolidateColumns(
const rpc::GSParams& params);

bl::result<std::string> getContextData(const rpc::GSParams& params);

bl::result<std::shared_ptr<grape::InArchive>> graphToNumpy(
Expand Down
62 changes: 62 additions & 0 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class FragmentWrapper<
using fragment_t =
vineyard::ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>;
using label_id_t = typename fragment_t::label_id_t;
using prop_id_t = typename fragment_t::prop_id_t;

public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
Expand Down Expand Up @@ -357,6 +358,67 @@ class FragmentWrapper<
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> ConsolidateColumns(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& label, const std::string& columns,
const std::string& result_column) override {
auto& schema = fragment_->schema();

label_id_t vertex_label_id = schema.GetVertexLabelId(label);
label_id_t edge_label_id = schema.GetEdgeLabelId(label);

std::vector<std::string> column_names;
boost::split(column_names, columns, boost::is_any_of(",;"));

if (vertex_label_id == -1 && edge_label_id == -1) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid vertex or edge label: " + label);
}

auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
vineyard::ObjectID new_frag_id = vineyard::InvalidObjectID();
if (vertex_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateVertexColumns(
*client, vertex_label_id, column_names,
result_column));
} else if (edge_label_id != -1) {
BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateEdgeColumns(
*client, edge_label_id, column_names,
result_column));
}

VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(frag_group_id));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);

rpc::graph::GraphDefPb new_graph_def;

new_graph_def.set_key(dst_graph_name);
new_graph_def.set_compact_edges(new_frag->compact_edges());
new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash());

gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def_.has_extension()) {
graph_def_.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(frag_group_id);
vy_info.clear_fragments();
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
new_graph_def.mutable_extension()->PackFrom(vy_info);

set_graph_def(new_frag, new_graph_def);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down
6 changes: 6 additions & 0 deletions analytical_engine/core/object/i_fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ class ILabeledFragmentWrapper : public IFragmentWrapper {
const std::map<int, std::vector<int>>& vertices,
const std::map<int, std::vector<int>>& edges) = 0;

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>>
ConsolidateColumns(const grape::CommSpec& comm_spec,
const std::string& dst_graph_name,
const std::string& label, const std::string& columns,
const std::string& result_column) = 0;

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down
1 change: 1 addition & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
graph_name, graph_def, frag);
return std::dynamic_pointer_cast<gs::IFragmentWrapper>(wrapper);
}

} // namespace detail

/**
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DAGManager(object):
types_pb2.CREATE_GRAPH, # spawn an io stream to read/write data from/to vineyard
types_pb2.BIND_APP, # need loaded graph to compile
types_pb2.ADD_LABELS, # need loaded graph
types_pb2.CONSOLIDATE_COLUMNS, # need loaded graph to transform selector
types_pb2.RUN_APP, # need loaded app
types_pb2.CONTEXT_TO_NUMPY, # need loaded graph to transform selector
types_pb2.CONTEXT_TO_DATAFRAME, # need loaded graph to transform selector
Expand Down
8 changes: 7 additions & 1 deletion coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies):
)

# Handle op that depends on loader (data source)
if op.op == types_pb2.CREATE_GRAPH or op.op == types_pb2.ADD_LABELS:
if op.op in [
types_pb2.CREATE_GRAPH,
types_pb2.CONSOLIDATE_COLUMNS,
types_pb2.ADD_LABELS,
]:
for key_of_parent_op in op.parents:
parent_op = self._key_to_op[key_of_parent_op]
if parent_op.op == types_pb2.DATA_SOURCE:
Expand All @@ -153,6 +157,7 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies):
)
or op.op == types_pb2.TRANSFORM_GRAPH
or op.op == types_pb2.PROJECT_TO_SIMPLE
or op.op == types_pb2.CONSOLIDATE_COLUMNS
or op.op == types_pb2.ADD_LABELS
or op.op == types_pb2.ARCHIVE_GRAPH
):
Expand Down Expand Up @@ -187,6 +192,7 @@ def post_process(self, response_head, response_bodies):
if op.op in (
types_pb2.CREATE_GRAPH,
types_pb2.PROJECT_GRAPH,
types_pb2.CONSOLIDATE_COLUMNS,
types_pb2.PROJECT_TO_SIMPLE,
types_pb2.TRANSFORM_GRAPH,
types_pb2.ADD_LABELS,
Expand Down
13 changes: 13 additions & 0 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
_pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_GRAPH:
_pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.CONSOLIDATE_COLUMNS:
_pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.PROJECT_TO_SIMPLE:
_pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ADD_COLUMN:
Expand Down Expand Up @@ -1261,6 +1263,17 @@ def _get_all_e_props_id(schema, label):
del op.attr[types_pb2.EDGE_COLLECTIONS]


def _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
# get parent graph schema
key_of_parent_op = op.parents[0]
r = op_result_pool[key_of_parent_op]
graph_name = r.graph_def.key
op.attr[types_pb2.GRAPH_NAME].CopyFrom(
attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore"))
)


def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Graph object

.. autoclass:: GraphDAGNode
:special-members: __init__
:members: add_vertices, add_edges, add_column, project, unload
:members: add_vertices, add_edges, add_column, project, unload, consolidate_columns

.. autoclass:: Graph
:special-members: __init__
Expand Down
4 changes: 4 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ enum OperationType {
ARCHIVE_GRAPH = 24; // archive graph
SERIALIZE_GRAPH = 25; // serialize graph
DESERIALIZE_GRAPH = 26; // desrialize graph
CONSOLIDATE_COLUMNS = 27; // consolidate property columns in the graph

SUBGRAPH = 32; // subgraph in interactive query

Expand Down Expand Up @@ -182,6 +183,9 @@ enum ParamKey {
VERTEX_MAP_TYPE = 45;
COMPACT_EDGES = 46;
USE_PERFECT_HASH = 47;
CONSOLIDATE_COLUMNS_LABEL = 48;
CONSOLIDATE_COLUMNS_COLUMNS = 49;
CONSOLIDATE_COLUMNS_RESULT_COLUMN = 50;

// project
VERTEX_COLLECTIONS = 51;
Expand Down
53 changes: 53 additions & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import json
import pickle
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

from graphscope.framework import utils
from graphscope.framework.errors import check_argument
Expand Down Expand Up @@ -224,6 +228,55 @@ def add_labels_to_graph(graph, loader_op):
return op


def consolidate_columns(
graph,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
):
"""Consolidate property columns in the graph.

Args:
graph (:class:`Graph`)
label (str): The label of the vertex/edge to be consolidated.
columns: The columns to be consolidated.
result_column: The column name of the result.

Returns:
Operation
"""
check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY)
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
types_pb2.RETAIN_OID: utils.b_to_attr(graph._retain_oid),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(False),
types_pb2.CONSOLIDATE_COLUMNS_LABEL: utils.s_to_attr(label),
types_pb2.CONSOLIDATE_COLUMNS_COLUMNS: utils.s_to_attr(",".join(columns)),
types_pb2.CONSOLIDATE_COLUMNS_RESULT_COLUMN: utils.s_to_attr(result_column),
}

# The key maybe filled later in coordinator
if hasattr(graph, "key"):
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)

op = Operation(
graph.session_id,
types_pb2.CONSOLIDATE_COLUMNS,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op


def dynamic_to_arrow(graph):
"""Create an op to transform a :class:`nx.Graph` object to :class:`Graph`.

Expand Down
Loading
Loading