Skip to content

Commit

Permalink
[FEAT][GraphAr] Add API to python client to enable loading graph from…
Browse files Browse the repository at this point in the history
… gar files or archive graph to gar files (alibaba#2588)

## What do these changes do?

- Add API to python client to enable loading graph from gar files or
archive graph to gar files

## Related issue number
issue apache/incubator-graphar#41
  • Loading branch information
acezen committed Mar 13, 2024
1 parent 8c98bc5 commit f42ca77
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 20 deletions.
4 changes: 4 additions & 0 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ if (ENABLE_JAVA_SDK)
target_compile_definitions(grape_engine PUBLIC ENABLE_JAVA_SDK)
endif()

if (BUILD_VINEYARD_GRAPH_WITH_GAR)
target_compile_definitions(grape_engine PRIVATE -DENABLE_GAR)
endif()

if (LET_IT_CRASH_ON_EXCEPTION)
target_compile_definitions(grape_engine PRIVATE LET_IT_CRASH_ON_EXCEPTION)
endif ()
Expand Down
19 changes: 19 additions & 0 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ bl::result<void> GrapeInstance::unloadGraph(const rpc::GSParams& params) {
return object_manager_.RemoveObject(graph_name);
}

bl::result<void> GrapeInstance::archiveGraph(const rpc::GSParams& params) {
if (params.HasKey(rpc::VINEYARD_ID)) {
BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE));
BOOST_LEAF_AUTO(frag_group_id, params.Get<int64_t>(rpc::VINEYARD_ID));
BOOST_LEAF_AUTO(graph_utils,
object_manager_.GetObject<PropertyGraphUtils>(type_sig));
bool exists = false;
VY_OK_OR_RAISE(client_->Exists(frag_group_id, exists));
if (exists) {
graph_utils->ArchiveGraph(frag_group_id, comm_spec_, *client_, params);
}
}
return {};
}

bl::result<std::string> GrapeInstance::loadApp(const rpc::GSParams& params) {
BOOST_LEAF_AUTO(algo_name, params.Get<std::string>(rpc::APP_ALGO));
std::string app_name = "app_" + algo_name + "_" + generateId();
Expand Down Expand Up @@ -1272,6 +1287,10 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
true);
break;
}
case rpc::ARCHIVE_GRAPH: {
BOOST_LEAF_CHECK(archiveGraph(params));
break;
}
case rpc::PROJECT_GRAPH: {
BOOST_LEAF_AUTO(graph_def, projectGraph(params));
r->set_graph_def(graph_def);
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class GrapeInstance : public Subscriber {

bl::result<void> unloadGraph(const rpc::GSParams& params);

bl::result<void> archiveGraph(const rpc::GSParams& params);

bl::result<std::string> loadApp(const rpc::GSParams& params);

bl::result<void> unloadApp(const rpc::GSParams& params);
Expand Down
25 changes: 24 additions & 1 deletion analytical_engine/core/object/graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ typedef void LoadGraphT(
const std::string& graph_name, const rpc::GSParams& params,
bl::result<std::shared_ptr<IFragmentWrapper>>& fragment_wrapper);

typedef void ArchiveGraphT(vineyard::ObjectID frag_id,
const grape::CommSpec& comm_spec,
vineyard::Client& client,
const gs::rpc::GSParams& params,
bl::result<void>& result_out);

typedef void AddLabelsToGraphT(
vineyard::ObjectID frag_id, const grape::CommSpec& comm_spec,
vineyard::Client& client, const std::string& graph_name,
Expand All @@ -68,7 +74,7 @@ typedef void ToDynamicFragmentT(
/**
* @brief PropertyGraphUtils is a invoker of property_graph_frame library. This
* utility provides these methods to manipulate ArrowFragment: LoadGraph,
* ToArrowFragment and ToDynamicFragment.
* ArchiveGraph, ToArrowFragment and ToDynamicFragment.
*/
class PropertyGraphUtils : public GSObject {
public:
Expand All @@ -77,6 +83,7 @@ class PropertyGraphUtils : public GSObject {
lib_path_(std::move(lib_path)),
dl_handle_(nullptr),
load_graph_(nullptr),
archive_graph_(nullptr),
add_labels_to_graph_(nullptr),
to_arrow_fragment_(nullptr),
to_dynamic_fragment_(nullptr) {}
Expand All @@ -87,6 +94,11 @@ class PropertyGraphUtils : public GSObject {
BOOST_LEAF_AUTO(p_fun, get_func_ptr(lib_path_, dl_handle_, "LoadGraph"));
load_graph_ = reinterpret_cast<LoadGraphT*>(p_fun);
}
{
BOOST_LEAF_AUTO(p_fun,
get_func_ptr(lib_path_, dl_handle_, "ArchiveGraph"));
archive_graph_ = reinterpret_cast<ArchiveGraphT*>(p_fun);
}
{
BOOST_LEAF_AUTO(p_fun,
get_func_ptr(lib_path_, dl_handle_, "AddLabelsToGraph"));
Expand Down Expand Up @@ -114,6 +126,16 @@ class PropertyGraphUtils : public GSObject {
return wrapper;
}

bl::result<void> ArchiveGraph(vineyard::ObjectID frag_id,
const grape::CommSpec& comm_spec,
vineyard::Client& client,
const rpc::GSParams& params) {
bl::result<void> out;

archive_graph_(frag_id, comm_spec, client, params, out);
return out;
}

bl::result<std::shared_ptr<IFragmentWrapper>> AddLabelsToGraph(
vineyard::ObjectID frag_id, const grape::CommSpec& comm_spec,
vineyard::Client& client, const std::string& graph_name,
Expand Down Expand Up @@ -162,6 +184,7 @@ class PropertyGraphUtils : public GSObject {
std::string lib_path_;
void* dl_handle_;
LoadGraphT* load_graph_;
ArchiveGraphT* archive_graph_;
AddLabelsToGraphT* add_labels_to_graph_;
ToArrowFragmentT* to_arrow_fragment_;
ToDynamicFragmentT* to_dynamic_fragment_;
Expand Down
89 changes: 73 additions & 16 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "vineyard/client/client.h"
#include "vineyard/graph/fragment/arrow_fragment.h"
#include "vineyard/graph/loader/fragment_loader_utils.h"
#include "vineyard/graph/loader/gar_fragment_loader.h"
#include "vineyard/graph/writer/arrow_fragment_writer.h"

#include "core/config.h"
#include "core/error.h"
Expand Down Expand Up @@ -95,20 +97,45 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
graph_name, graph_def, frag);
return std::dynamic_pointer_cast<gs::IFragmentWrapper>(wrapper);
} else {
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
using loader_t = gs::arrow_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info);
vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID();
bool generate_eid = false;
bool retain_oid = false;
bool from_gar = params.HasKey(gs::rpc::IS_FROM_GAR)
? params.Get<bool>(gs::rpc::IS_FROM_GAR).value()
: false;
if (from_gar) {
#ifdef ENABLE_GAR
BOOST_LEAF_AUTO(graph_info_path,
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));
BOOST_LEAF_ASSIGN(generate_eid, params.Get<bool>(gs::rpc::GENERATE_EID));
BOOST_LEAF_ASSIGN(retain_oid, params.Get<bool>(gs::rpc::RETAIN_OID));
using loader_t =
vineyard::gar_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info_path);
MPI_Barrier(comm_spec.comm());
BOOST_LEAF_ASSIGN(frag_group_id, loader.LoadFragmentAsFragmentGroup());
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"The vineyard is not compiled with GAR support");
#endif
} else {
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
using loader_t = gs::arrow_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info);

MPI_Barrier(comm_spec.comm());
{
vineyard::json __dummy;
VINEYARD_DISCARD(
client.GetData(vineyard::InvalidObjectID(), __dummy, true, false));
}

MPI_Barrier(comm_spec.comm());
{
vineyard::json __dummy;
VINEYARD_DISCARD(
client.GetData(vineyard::InvalidObjectID(), __dummy, true, false));
BOOST_LEAF_ASSIGN(frag_group_id, loader.LoadFragmentAsFragmentGroup());
generate_eid = graph_info->generate_eid;
retain_oid = graph_info->retain_oid;
}

BOOST_LEAF_AUTO(frag_group_id, loader.LoadFragmentAsFragmentGroup());
MPI_Barrier(comm_spec.comm());

LOG_IF(INFO, comm_spec.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-100";

Expand Down Expand Up @@ -138,8 +165,8 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
for (auto const& item : fg->Fragments()) {
vy_info.add_fragments(item.second);
}
vy_info.set_generate_eid(graph_info->generate_eid);
vy_info.set_retain_oid(graph_info->retain_oid);
vy_info.set_generate_eid(generate_eid);
vy_info.set_retain_oid(retain_oid);
graph_def.mutable_extension()->PackFrom(vy_info);
gs::set_graph_def(frag, graph_def);

Expand All @@ -149,6 +176,30 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
}
}

__attribute__((visibility("hidden"))) static bl::result<void> ArchiveGraph(
vineyard::ObjectID frag_group_id, const grape::CommSpec& comm_spec,
vineyard::Client& client, const gs::rpc::GSParams& params) {
#ifdef ENABLE_GAR
BOOST_LEAF_AUTO(graph_info_path,
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));

auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client.GetObject(frag_group_id));
auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id());
auto frag_id = fg->Fragments().at(fid);
auto frag = std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id));

using archive_t = vineyard::ArrowFragmentWriter<_GRAPH_TYPE>;
archive_t archive(frag, comm_spec, graph_info_path);
archive.WriteFragment();

return {};
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"The vineyard is not compiled with GAR support");
#endif
}

__attribute__((visibility(
"hidden"))) static bl::result<std::shared_ptr<gs::IFragmentWrapper>>
ToArrowFragment(vineyard::Client& client, const grape::CommSpec& comm_spec,
Expand Down Expand Up @@ -317,10 +368,10 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,

/**
* property_graph_frame.cc serves as a frame to be compiled with ArrowFragment.
* LoadGraph, ToArrowFragment, and ToDynamicFragment functions are provided to
* proceed with corresponding operations. The frame only needs one macro
* _GRAPH_TYPE to present which specialized ArrowFragment type will be injected
* into the frame.
* LoadGraph, ArchiveGraph, ToArrowFragment, and ToDynamicFragment functions
* are provided to proceed with corresponding operations. The frame only needs
* one macro _GRAPH_TYPE to present which specialized ArrowFragment type will be
* injected into the frame.
*/
extern "C" {

Expand All @@ -333,6 +384,12 @@ void LoadGraph(
detail::LoadGraph(comm_spec, client, graph_name, params));
}

void ArchiveGraph(vineyard::ObjectID frag_id, const grape::CommSpec& comm_spec,
vineyard::Client& client, const gs::rpc::GSParams& params,
bl::result<void>& result_out) {
result_out = detail::ArchiveGraph(frag_id, comm_spec, client, params);
}

void ToArrowFragment(
vineyard::Client& client, const grape::CommSpec& comm_spec,
std::shared_ptr<gs::IFragmentWrapper>& wrapper_in,
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies):
or op.op == types_pb2.TRANSFORM_GRAPH
or op.op == types_pb2.PROJECT_TO_SIMPLE
or op.op == types_pb2.ADD_LABELS
or op.op == types_pb2.ARCHIVE_GRAPH
):
op = self._maybe_register_graph(op)
return dag_def, dag_bodies
Expand Down
4 changes: 4 additions & 0 deletions coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ elseif (PROPERTY_GRAPH_FRAME)
if (ENABLE_JAVA_SDK)
target_compile_definitions(${FRAME_NAME} PRIVATE ENABLE_JAVA_SDK)
endif()
# check if vineyard built with GAR and if the vertex map is ArrowVertexMap
if(BUILD_VINEYARD_GRAPH_WITH_GAR AND ${_graph_type} MATCHES "ArrowVertexMap")
target_compile_definitions(${FRAME_NAME} PRIVATE -DENABLE_GAR)
endif()
target_include_directories(${FRAME_NAME} PRIVATE utils)
set_target_properties(${FRAME_NAME} PROPERTIES COMPILE_FLAGS "-fPIC")
elseif (PROJECT_FRAME)
Expand Down
13 changes: 13 additions & 0 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901
_pre_process_for_add_column_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.UNLOAD_GRAPH:
_pre_process_for_unload_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op == types_pb2.ARCHIVE_GRAPH:
_pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs)
if op.op in (
types_pb2.CONTEXT_TO_NUMPY,
types_pb2.CONTEXT_TO_DATAFRAME,
Expand Down Expand Up @@ -1234,6 +1236,17 @@ def _get_all_e_props_id(schema, label):
del op.attr[types_pb2.EDGE_COLLECTIONS]


def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs):
assert len(op.parents) == 1
key_of_parent_op = op.parents[0]
result = op_result_pool[key_of_parent_op]
op.attr[types_pb2.GRAPH_NAME].CopyFrom(utils.s_to_attr(result.graph_def.key))
if result.graph_def.extension.Is(graph_def_pb2.VineyardInfoPb.DESCRIPTOR):
vy_info = graph_def_pb2.VineyardInfoPb()
result.graph_def.extension.Unpack(vy_info)
op.attr[types_pb2.VINEYARD_ID].CopyFrom(utils.i_to_attr(vy_info.vineyard_id))


# Below are selector transformation part, which will transform label / property
# names to corresponding id.

Expand Down
1 change: 1 addition & 0 deletions python/graphscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from graphscope.framework.errors import *
from graphscope.framework.graph import Graph
from graphscope.framework.graph_builder import load_from
from graphscope.framework.graph_builder import load_from_gar
from graphscope.version import __version__

__doc__ = """
Expand Down
7 changes: 7 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,13 @@ def load_from(self, *args, **kwargs):
with default_session(self):
return graphscope.load_from(*args, **kwargs)

def load_from_gar(self, *args, **kwargs):
"""Load a graph from gar format files within the session.
See more information in :meth:`graphscope.load_from_gar`.
"""
with default_session(self):
return graphscope.load_from_gar(*args, **kwargs)

def _run_on_local(self):
self._config_params["port"] = None
self._config_params["vineyard_socket"] = ""
Expand Down
28 changes: 28 additions & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def add_labels_to_graph(graph, loader_op):
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(False),
}
# inferred from the context of the dag.
config.update({types_pb2.GRAPH_NAME: utils.s_to_attr("")})
Expand Down Expand Up @@ -1068,3 +1069,30 @@ def fetch_gremlin_result(result_set, fetch_type="one"):
output_types=types_pb2.RESULTS,
)
return op


def archive_graph(graph, path):
"""Archive a graph to gar format with a path.
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
path (str): The path to archive the graph.
Returns:
An op to archive the graph to a path.
"""
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
}
config[types_pb2.GRAPH_INFO_PATH] = utils.s_to_attr(path)
op = Operation(
graph.session_id,
types_pb2.ARCHIVE_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op

0 comments on commit f42ca77

Please sign in to comment.