Skip to content

Commit

Permalink
Support extend label data for graph in eager mode.
Browse files Browse the repository at this point in the history
Signed-off-by: SighingSnow <1263750383@qq.com>

Committed-by: SighingSnow from Dev container

Committed-by: SighingSnow from Dev container

Committed-by: SighingSnow from Dev container
  • Loading branch information
SighingSnow committed Oct 12, 2023
1 parent 6e33f85 commit de4fb7c
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 7 deletions.
4 changes: 4 additions & 0 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct Graph {
bool retain_oid = true;
bool compact_edges = false;
bool use_perfect_hash = false;
int extend_type = 0; // 0 means no need to extend label data

std::string SerializeToString() const {
std::stringstream ss;
Expand Down Expand Up @@ -289,13 +290,16 @@ inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
BOOST_LEAF_AUTO(compact_edges, params.Get<bool>(rpc::COMPACT_EDGES, false));
BOOST_LEAF_AUTO(use_perfect_hash,
params.Get<bool>(rpc::USE_PERFECT_HASH, false));
BOOST_LEAF_AUTO(extend_type,
params.Get<int64_t>(rpc::EXTEND_LABEL_DATA));

auto graph = std::make_shared<detail::Graph>();
graph->directed = directed;
graph->generate_eid = generate_eid;
graph->retain_oid = retain_oid;
graph->compact_edges = compact_edges;
graph->use_perfect_hash = use_perfect_hash;
graph->extend_type = extend_type;

const auto& large_attr = params.GetLargeAttr();
for (const auto& item : large_attr.chunk_list().items()) {
Expand Down
52 changes: 52 additions & 0 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,65 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader<OID_T, VID_T> {
return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedVLable(
vineyard::ObjectID frag_id, label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedVLabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedELable(
vineyard::ObjectID frag_id,label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedELabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

boost::leaf::result<vineyard::ObjectID> AddLabelsToFragmentAsFragmentGroup(
vineyard::ObjectID frag_id) {
BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id));
VY_OK_OR_RAISE(client_.Persist(new_frag_id));
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<vineyard::ObjectID> ExtendLabelData(
vineyard::ObjectID frag_id, int extend_type) {
// find duplicate label id
assert(extend_type);
auto frag = std::dynamic_pointer_cast<vineyard::ArrowFragmentBase>(
client_.GetObject(frag_id));
vineyard::PropertyGraphSchema schema = frag->schema();
std::vector<std::string> labels;
label_id_t target_label_id = -1;
if (extend_type == 1)
labels = schema.GetVertexLabels();
else if (extend_type == 2)
labels = schema.GetEdgeLabels();

std::set<std::string> label_set(labels.begin(), labels.end());
for (size_t i = 0; i < graph_info_->vertices.size(); ++i) {
if (label_set.find(graph_info_->vertices[i]->label) != label_set.end()) {
target_label_id = i;
break;
}
}
assert(target_label_id != -1);
vineyard::ObjectID new_frag_id;
std::cout<<"extend type: "<<extend_type<<std::endl;
std::cout<<"target label id: "<<target_label_id<<std::endl;

if (extend_type == 1) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedVLable(frag_id, target_label_id));
} else if (extend_type == 2) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedELable(frag_id, target_label_id));
}
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<void> initPartitioner() {
#ifdef HASH_PARTITION
Base::partitioner_.Init(comm_spec_.fnum());
Expand Down
10 changes: 8 additions & 2 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,15 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
using loader_t = gs::arrow_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info);

BOOST_LEAF_AUTO(frag_group_id,
vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID();

if(graph_info->extend_type) {
BOOST_LEAF_ASSIGN(frag_group_id,
loader.ExtendLabelData(origin_frag_id, graph_info->extend_type));
} else {
BOOST_LEAF_ASSIGN(frag_group_id,
loader.AddLabelsToFragmentAsFragmentGroup(origin_frag_id));
}
MPI_Barrier(comm_spec.comm());

LOG_IF(INFO, comm_spec.worker_id() == 0)
Expand Down
3 changes: 3 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ enum ParamKey {
IS_FROM_GAR = 70;
GRAPH_INFO_PATH = 71;

// Extend label data
EXTEND_LABEL_DATA = 80;

APP_NAME = 100;
APP_ALGO = 101;
APP_LIBRARY_PATH = 102;
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def add_labels_to_graph(graph, loader_op):
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
Expand Down
21 changes: 16 additions & 5 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self):
self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP
self._compact_edges = False
self._use_perfect_hash = False
self._extend_label_data = 0

@property
def session_id(self):
Expand Down Expand Up @@ -215,6 +216,7 @@ def _construct_op_of_empty_graph(self):
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map)
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges)
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash)
config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data)
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config
)
Expand Down Expand Up @@ -304,6 +306,11 @@ def __init__(
self._vertex_map = utils.vertex_map_type_to_enum(vertex_map)
self._compact_edges = compact_edges
self._use_perfect_hash = use_perfect_hash
# for need to extend label in 'eager mode' when add_vertices and add_edges
# 0 - not extending label
# 1 - extend vertex label
# 2 - extend edge label
self._extend_label_data = 0

# list of pair <parent_op_key, VertexLabel/EdgeLabel>
self._unsealed_vertices_and_edges = list()
Expand Down Expand Up @@ -505,10 +512,13 @@ def add_vertices(
"Cannot incrementally add vertices to graphs with compacted edges, "
"please use `graphscope.load_from()` instead."
)
if label in self._v_labels:
raise ValueError(f"Label {label} already existed in graph.")
if not self._v_labels and self._e_labels:
raise ValueError("Cannot manually add vertices after inferred vertices.")
# currently not support local_vertex_map
if label in self._v_labels:
self._extend_label_data = 1
warnings.warn(f"Label {label} already existed in graph"
", origin label data will be extend.")
unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges)
vertex_label = VertexLabel(
label=label,
Expand All @@ -520,7 +530,8 @@ def add_vertices(
)
unsealed_vertices_and_edges.append((self.op.key, vertex_label))
v_labels = deepcopy(self._v_labels)
v_labels.append(label)
if self._extend_label_data == 0:
v_labels.append(label)
# generate and add a loader op to dag
loader_op = dag_utils.create_loader(vertex_label)
self._session.dag.add_op(loader_op)
Expand Down Expand Up @@ -616,7 +627,7 @@ def add_edges(

if self.evaluated:
if label in self._e_labels:
raise ValueError(f"Label {label} already existed in graph")
self._extend_label_data = 2

unsealed_vertices = list()
unsealed_edges = list()
Expand All @@ -634,7 +645,7 @@ def add_edges(
v_labels.append(dst_label)

parent = self
if label in self.e_labels:
if self.evaluated != True and label in self.e_labels:
# aggregate op with the same edge label
fork = False
unsealed_vertices_and_edges = list()
Expand Down

0 comments on commit de4fb7c

Please sign in to comment.