Skip to content

Commit

Permalink
Add another write path that uses the C++ IO adaptor (#1480)
Browse files Browse the repository at this point in the history
* Add write using vineyard c++ adaptor
* Add another write path that uses the C++ IO adaptor
  • Loading branch information
siyuan0322 committed Apr 25, 2022
1 parent 23b612b commit 2bc40ad
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 170 deletions.
213 changes: 140 additions & 73 deletions analytical_engine/core/grape_instance.cc

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class GrapeInstance : public Subscriber {
bl::result<std::string> contextToVineyardDataFrame(
const rpc::GSParams& params);

bl::result<void> outputContext(const rpc::GSParams& params);

bl::result<std::string> output(const rpc::GSParams& params);

bl::result<rpc::graph::GraphDefPb> addColumn(const rpc::GSParams& params);

bl::result<rpc::graph::GraphDefPb> convertGraph(const rpc::GSParams& params);
Expand Down Expand Up @@ -152,6 +156,24 @@ class GrapeInstance : public Subscriber {

bl::result<void> registerGraphType(const rpc::GSParams& params);

bl::result<void> getContextDetails(
const rpc::GSParams& params, std::string* s_selector,
std::pair<std::string, std::string>* range,
std::shared_ptr<IContextWrapper>* wrapper) {
if (params.HasKey(rpc::SELECTOR)) {
BOOST_LEAF_ASSIGN(*s_selector, params.Get<std::string>(rpc::SELECTOR));
}
if (params.HasKey(rpc::VERTEX_RANGE)) {
BOOST_LEAF_AUTO(range_in_json,
params.Get<std::string>(rpc::VERTEX_RANGE));
*range = parseRange(range_in_json);
}
BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY));
BOOST_LEAF_ASSIGN(*wrapper,
object_manager_.GetObject<IContextWrapper>(context_key));
return {};
}

static std::string toJson(const std::map<std::string, std::string>& map) {
boost::property_tree::ptree pt;

Expand Down
130 changes: 65 additions & 65 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,71 @@ class FragmentWrapper<ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
std::shared_ptr<fragment_t> fragment_;
};

/**
* @brief A specialized FragmentWrapper for ArrowFlattenedFragment.
*/
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t = ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;

public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_FLATTENED);
}

std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}

const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}

bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}

bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a graph view over the ArrowFlattenedFragment.");
}

private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};

#ifdef NETWORKX
/**
* @brief A specialized FragmentWrapper for DynamicFragment.
Expand Down Expand Up @@ -979,71 +1044,6 @@ class FragmentWrapper<DynamicProjectedFragment<VDATA_T, EDATA_T>>
std::shared_ptr<fragment_t> fragment_;
};

/**
* @brief A specialized FragmentWrapper for ArrowFlattenedFragment.
*/
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t = ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;

public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::ARROW_FLATTENED);
}

std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}

const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}

bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}

bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected ArrowFlattenedFragment");
}

bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a graph view over the ArrowFlattenedFragment.");
}

private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
#endif

} // namespace gs
#endif // ANALYTICAL_ENGINE_CORE_OBJECT_FRAGMENT_WRAPPER_H_
36 changes: 18 additions & 18 deletions analytical_engine/frame/project_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,24 @@ class ProjectSimpleFrame<
}
};

#ifdef NETWORKX
template <typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
using fragment_t = DynamicFragment;
using projected_fragment_t = gs::DynamicProjectedFragment<VDATA_T, EDATA_T>;
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {
using fragment_t = vineyard::ArrowFragment<OID_T, VID_T>;
using projected_fragment_t =
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;

public:
static bl::result<std::shared_ptr<IFragmentWrapper>> Project(
std::shared_ptr<IFragmentWrapper>& input_wrapper,
const std::string& projected_graph_name, const rpc::GSParams& params) {
auto graph_type = input_wrapper->graph_def().graph_type();
if (graph_type != rpc::graph::DYNAMIC_PROPERTY) {
if (graph_type != rpc::graph::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"graph_type should be DYNAMIC_PROPERTY, got " +
"graph_type should be ARROW_PROPERTY, got " +
rpc::graph::GraphTypePb_Name(graph_type));
}

BOOST_LEAF_AUTO(v_prop_key, params.Get<std::string>(rpc::V_PROP_KEY));
BOOST_LEAF_AUTO(e_prop_key, params.Get<std::string>(rpc::E_PROP_KEY));
auto input_frag =
Expand All @@ -156,7 +158,7 @@ class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
rpc::graph::GraphDefPb graph_def;

graph_def.set_key(projected_graph_name);
graph_def.set_graph_type(rpc::graph::DYNAMIC_PROJECTED);
graph_def.set_graph_type(rpc::graph::ARROW_FLATTENED);
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
Expand All @@ -176,24 +178,22 @@ class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
}
};

template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {
using fragment_t = vineyard::ArrowFragment<OID_T, VID_T>;
using projected_fragment_t =
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
#ifdef NETWORKX
template <typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
using fragment_t = DynamicFragment;
using projected_fragment_t = gs::DynamicProjectedFragment<VDATA_T, EDATA_T>;

public:
static bl::result<std::shared_ptr<IFragmentWrapper>> Project(
std::shared_ptr<IFragmentWrapper>& input_wrapper,
const std::string& projected_graph_name, const rpc::GSParams& params) {
auto graph_type = input_wrapper->graph_def().graph_type();
if (graph_type != rpc::graph::ARROW_PROPERTY) {
if (graph_type != rpc::graph::DYNAMIC_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"graph_type should be ARROW_PROPERTY, got " +
"graph_type should be DYNAMIC_PROPERTY, got " +
rpc::graph::GraphTypePb_Name(graph_type));
}

BOOST_LEAF_AUTO(v_prop_key, params.Get<std::string>(rpc::V_PROP_KEY));
BOOST_LEAF_AUTO(e_prop_key, params.Get<std::string>(rpc::E_PROP_KEY));
auto input_frag =
Expand All @@ -204,7 +204,7 @@ class ProjectSimpleFrame<
rpc::graph::GraphDefPb graph_def;

graph_def.set_key(projected_graph_name);
graph_def.set_graph_type(rpc::graph::ARROW_FLATTENED);
graph_def.set_graph_type(rpc::graph::DYNAMIC_PROJECTED);
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
Expand Down
6 changes: 3 additions & 3 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ def run_on_coordinator(
)
if op.op == types_pb2.DATA_SOURCE:
op_result = self._process_data_source(op, dag_bodies, loader_op_bodies)
elif op.op == types_pb2.OUTPUT:
op_result = self._output(op)
elif op.op == types_pb2.DATA_SINK:
op_result = self._process_data_sink(op)
else:
raise RuntimeError("Unsupport op type: " + str(op.op))
response_head.head.results.append(op_result)
Expand Down Expand Up @@ -926,7 +926,7 @@ def _fetch_gremlin_result(self, op: op_def_pb2.OpDef):
result=pickle.dumps(rlt),
)

def _output(self, op: op_def_pb2.OpDef):
def _process_data_sink(self, op: op_def_pb2.OpDef):
import vineyard
import vineyard.io

Expand Down
3 changes: 2 additions & 1 deletion coordinator/gscoordinator/dag_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DAGManager(object):
types_pb2.GRAPH_TO_DATAFRAME, # need loaded graph to transform selector
types_pb2.TO_VINEYARD_TENSOR, # need loaded graph to transform selector
types_pb2.TO_VINEYARD_DATAFRAME, # need loaded graph to transform selector
types_pb2.OUTPUT, # need loaded graph to transform selector
types_pb2.PROJECT_GRAPH, # need loaded graph to transform selector
types_pb2.PROJECT_TO_SIMPLE, # need loaded graph schema information
types_pb2.ADD_COLUMN, # need ctx result
Expand All @@ -75,7 +76,7 @@ class DAGManager(object):

_coordinator_split_op = [
types_pb2.DATA_SOURCE, # spawn an io stream to read/write data from/to vineyard
types_pb2.OUTPUT, # spawn an io stream to read/write data from/to vineyard
types_pb2.DATA_SINK, # spawn an io stream to read/write data from/to vineyard
]

def __init__(self, request_iterator: Sequence[message_pb2.RunStepRequest]):
Expand Down
13 changes: 9 additions & 4 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
types_pb2.CONTEXT_TO_DATAFRAME,
types_pb2.TO_VINEYARD_TENSOR,
types_pb2.TO_VINEYARD_DATAFRAME,
types_pb2.OUTPUT,
):
_pre_process_for_context_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (types_pb2.GRAPH_TO_NUMPY, types_pb2.GRAPH_TO_DATAFRAME):
Expand Down Expand Up @@ -538,8 +539,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
_pre_process_for_close_learning_instance_op(
op, op_result_pool, key_to_op, **kwargs
)
if op.op == types_pb2.OUTPUT:
_pre_process_for_output_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.DATA_SINK:
_pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (types_pb2.TO_DIRECTED, types_pb2.TO_UNDIRECTED):
_pre_process_for_transform_op(op, op_result_pool, key_to_op, **kwargs)

Expand Down Expand Up @@ -857,7 +858,11 @@ def __backtrack_key_of_graph_op(key):
schema = GraphSchema()
schema.from_graph_def(r.graph_def)
selector = op.attr[types_pb2.SELECTOR].s.decode("utf-8")
if op.op in (types_pb2.CONTEXT_TO_DATAFRAME, types_pb2.TO_VINEYARD_DATAFRAME):
if op.op in (
types_pb2.CONTEXT_TO_DATAFRAME,
types_pb2.TO_VINEYARD_DATAFRAME,
types_pb2.OUTPUT,
):
selector = _tranform_dataframe_selector(context_type, schema, selector)
else:
# to numpy
Expand All @@ -868,7 +873,7 @@ def __backtrack_key_of_graph_op(key):
)


def _pre_process_for_output_op(op, op_result_pool, key_to_op, **kwargs):
def _pre_process_for_data_sink_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
parent_op = key_to_op[key_of_parent_op]
Expand Down
1 change: 1 addition & 0 deletions proto/graphscope/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ enum OperationType {
CLOSE_LEARNING_INSTANCE = 42;

DATA_SOURCE = 46; // loader
DATA_SINK = 47;

// data
CONTEXT_TO_NUMPY = 50;
Expand Down
18 changes: 16 additions & 2 deletions python/graphscope/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,22 @@ def output(self, fd, selector, vertex_range=None, **kwargs):
Returns:
:class:`graphscope.framework.context.ResultDAGNode`, evaluated in eager mode.
"""
df = self.to_vineyard_dataframe(selector, vertex_range)
op = dag_utils.output(df, fd, **kwargs)
protocol = fd.split("://")[0]
# Still use the stream to write to file,
# as the C++ adaptor in Vineyard requires arrow >= 4.0.0
if protocol in ("file", "hdfs", "hive", "oss", "s3"):
df = self.to_vineyard_dataframe(selector, vertex_range)
op = dag_utils.to_data_sink(df, fd, **kwargs)
else:
check_argument(
isinstance(selector, Mapping), "selector of to_dataframe must be a dict"
)
for _, value in selector.items():
self._check_selector(value)
_ensure_consistent_label(self.context_type, selector)
selector = json.dumps(selector)
vertex_range = utils.transform_vertex_range(vertex_range)
op = dag_utils.output(self, fd, selector, vertex_range, **kwargs)
return ResultDAGNode(self, op)

def __del__(self):
Expand Down
Loading

0 comments on commit 2bc40ad

Please sign in to comment.