Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(analytical): Support extend label data for graph in eager mode. #3288

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ struct Graph {
bool retain_oid = true;
bool compact_edges = false;
bool use_perfect_hash = false;
// This is used to extend the label data
// when user try to add data to existed labels.
// the available option is 0/1/2,
// 0 stands for no extend,
// 1 stands for extend vertex label data,
// 2 stands for extend edge label data.
int extend_type = 0;

std::string SerializeToString() const {
std::stringstream ss;
Expand Down Expand Up @@ -289,13 +296,15 @@ inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
BOOST_LEAF_AUTO(compact_edges, params.Get<bool>(rpc::COMPACT_EDGES, false));
BOOST_LEAF_AUTO(use_perfect_hash,
params.Get<bool>(rpc::USE_PERFECT_HASH, false));
BOOST_LEAF_AUTO(extend_type, params.Get<int64_t>(rpc::EXTEND_LABEL_DATA, 0));

auto graph = std::make_shared<detail::Graph>();
graph->directed = directed;
graph->generate_eid = generate_eid;
graph->retain_oid = retain_oid;
graph->compact_edges = compact_edges;
graph->use_perfect_hash = use_perfect_hash;
graph->extend_type = extend_type;

const auto& large_attr = params.GetLargeAttr();
for (const auto& item : large_attr.chunk_list().items()) {
Expand Down
69 changes: 69 additions & 0 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,82 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader<OID_T, VID_T> {
return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedVLable(
vineyard::ObjectID frag_id, label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedVLabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

bl::result<vineyard::ObjectID> AddDataToExistedELable(
vineyard::ObjectID frag_id, label_id_t label_id) {
BOOST_LEAF_CHECK(initPartitioner());
BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables());
return Base::addDataToExistedELabel(frag_id, label_id,
std::move(raw_v_e_tables));
}

boost::leaf::result<vineyard::ObjectID> AddLabelsToFragmentAsFragmentGroup(
vineyard::ObjectID frag_id) {
BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id));
VY_OK_OR_RAISE(client_.Persist(new_frag_id));
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<vineyard::ObjectID> ExtendLabelData(vineyard::ObjectID frag_id,
int extend_type) {
// find duplicate label id
assert(extend_type);
auto frag = std::dynamic_pointer_cast<vineyard::ArrowFragmentBase>(
client_.GetObject(frag_id));
vineyard::PropertyGraphSchema schema = frag->schema();
std::vector<std::string> labels;
label_id_t target_label_id = -1;
if (extend_type == 1)
labels = schema.GetVertexLabels();
else if (extend_type == 2)
labels = schema.GetEdgeLabels();

std::map<std::string, label_id_t> label_set;
for (size_t i = 0; i < labels.size(); ++i)
label_set[labels[i]] = i;

if (extend_type == 1) {
for (size_t i = 0; i < graph_info_->vertices.size(); ++i) {
auto it = label_set.find(graph_info_->vertices[i]->label);
if (it != label_set.end()) {
target_label_id = it->second;
break;
}
}
} else if (extend_type == 2) {
for (size_t i = 0; i < graph_info_->edges.size(); ++i) {
auto it = label_set.find(graph_info_->edges[i]->label);
if (it != label_set.end()) {
target_label_id = it->second;
break;
}
}
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"extend type is invalid");
}

if (target_label_id == -1)
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"label not found");
vineyard::ObjectID new_frag_id;
if (extend_type == 1) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedVLable(frag_id, target_label_id));
} else if (extend_type == 2) {
BOOST_LEAF_ASSIGN(new_frag_id,
AddDataToExistedELable(frag_id, target_label_id));
}
return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_);
}

bl::result<void> initPartitioner() {
#ifdef HASH_PARTITION
Base::partitioner_.Init(comm_spec_.fnum());
Expand Down
11 changes: 9 additions & 2 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,16 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
using loader_t = gs::arrow_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info);
vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID();

BOOST_LEAF_AUTO(frag_group_id,
loader.AddLabelsToFragmentAsFragmentGroup(origin_frag_id));
if (graph_info->extend_type) {
BOOST_LEAF_ASSIGN(
frag_group_id,
loader.ExtendLabelData(origin_frag_id, graph_info->extend_type));
} else {
BOOST_LEAF_ASSIGN(frag_group_id, loader.AddLabelsToFragmentAsFragmentGroup(
origin_frag_id));
}
MPI_Barrier(comm_spec.comm());

LOG_IF(INFO, comm_spec.worker_id() == 0)
Expand Down
3 changes: 3 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ enum ParamKey {
IS_FROM_GAR = 70;
GRAPH_INFO_PATH = 71;

// Extend label data
EXTEND_LABEL_DATA = 80;

APP_NAME = 100;
APP_ALGO = 101;
APP_LIBRARY_PATH = 102;
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/framework/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def add_labels_to_graph(graph, loader_op):
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
Expand Down
23 changes: 18 additions & 5 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self):
self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP
self._compact_edges = False
self._use_perfect_hash = False
self._extend_label_data = 0

@property
def session_id(self):
Expand Down Expand Up @@ -215,6 +216,7 @@ def _construct_op_of_empty_graph(self):
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map)
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges)
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash)
config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data)
return dag_utils.create_graph(
self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config
)
Expand Down Expand Up @@ -304,6 +306,11 @@ def __init__(
self._vertex_map = utils.vertex_map_type_to_enum(vertex_map)
self._compact_edges = compact_edges
self._use_perfect_hash = use_perfect_hash
# for need to extend label in 'eager mode' when add_vertices and add_edges
# 0 - not extending label
# 1 - extend vertex label
# 2 - extend edge label
self._extend_label_data = 0

# list of pair <parent_op_key, VertexLabel/EdgeLabel>
self._unsealed_vertices_and_edges = list()
Expand Down Expand Up @@ -505,10 +512,15 @@ def add_vertices(
"Cannot incrementally add vertices to graphs with compacted edges, "
"please use `graphscope.load_from()` instead."
)
if label in self._v_labels:
raise ValueError(f"Label {label} already existed in graph.")
if not self._v_labels and self._e_labels:
raise ValueError("Cannot manually add vertices after inferred vertices.")
# currently not support local_vertex_map
if label in self._v_labels:
self._extend_label_data = 1
warnings.warn(
f"Label {label} already existed in graph"
", origin label data will be extend."
)
unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges)
vertex_label = VertexLabel(
label=label,
Expand All @@ -520,7 +532,8 @@ def add_vertices(
)
unsealed_vertices_and_edges.append((self.op.key, vertex_label))
v_labels = deepcopy(self._v_labels)
v_labels.append(label)
if self._extend_label_data == 0:
v_labels.append(label)
# generate and add a loader op to dag
loader_op = dag_utils.create_loader(vertex_label)
self._session.dag.add_op(loader_op)
Expand Down Expand Up @@ -616,7 +629,7 @@ def add_edges(

if self.evaluated:
if label in self._e_labels:
raise ValueError(f"Label {label} already existed in graph")
self._extend_label_data = 2
siyuan0322 marked this conversation as resolved.
Show resolved Hide resolved

unsealed_vertices = list()
unsealed_edges = list()
Expand All @@ -634,7 +647,7 @@ def add_edges(
v_labels.append(dst_label)

parent = self
if label in self.e_labels:
if not self.evaluated and label in self.e_labels:
# aggregate op with the same edge label
fork = False
unsealed_vertices_and_edges = list()
Expand Down
10 changes: 0 additions & 10 deletions python/graphscope/tests/unittest/test_create_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,6 @@ def test_error_on_ambigious_default_label(
graph = graph.add_edges(student_group_e, "group")


def test_error_on_duplicate_labels(graphscope_session, student_group_e, student_v):
graph = graphscope_session.g()
graph = graph.add_vertices(student_v, "student")
with pytest.raises(ValueError, match="Label student already existed in graph"):
graph = graph.add_vertices(student_v, "student")
graph = graph.add_edges(student_group_e, "group")
with pytest.raises(ValueError, match="already existed in graph"):
graph = graph.add_edges(student_group_e, "group")


def test_load_complex_graph(
graphscope_session,
score_e,
Expand Down
55 changes: 46 additions & 9 deletions python/graphscope/tests/unittest/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os

import numpy as np
import pandas as pd
import pytest

import graphscope
Expand Down Expand Up @@ -278,15 +279,6 @@ def test_add_vertices_edges(graphscope_session):
graph = graph.add_vertices(
Loader(f"{prefix}/software.csv", delimiter="|"), "software"
)

with pytest.raises(ValueError, match="already existed in graph"):
graph = graph.add_edges(
Loader(f"{prefix}/knows.csv", delimiter="|"),
"knows",
src_label="software",
dst_label="software",
)

graph = graph.add_edges(
Loader(f"{prefix}/created.csv", delimiter="|"),
"created",
Expand All @@ -298,6 +290,51 @@ def test_add_vertices_edges(graphscope_session):
assert graph.schema.edge_labels == ["knows", "created"]


def test_extend_vertices_edges(graphscope_session):
prefix = os.path.expandvars("${GS_TEST_DIR}/")
verts = pd.read_csv(f"{prefix}/p2p_v.csv")
edges = pd.read_csv(f"{prefix}/p2p_e.csv")
test_list = ["v11308", "v50089", "v60129"]

g1 = graphscope_session.g(oid_type="std::string")
g1 = g1.add_vertices(Loader(verts), "person")
g1 = g1.add_edges(Loader(edges), "knows", src_label="person", dst_label="person")

g2 = graphscope_session.g(oid_type="std::string")
g2 = g2.add_vertices(Loader(verts[:12980]), "person")
g2 = g2.add_vertices(Loader(verts[12980:31530]), "person")
g2 = g2.add_vertices(Loader(verts[31530:]), "person")

g2 = g2.add_edges(
Loader(edges[:2302]), "knows", src_label="person", dst_label="person"
)
g2 = g2.add_edges(
Loader(edges[2302:40021]), "knows", src_label="person", dst_label="person"
)
g2 = g2.add_edges(
Loader(edges[40021:]), "knows", src_label="person", dst_label="person"
)

sg1 = g1.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]})
sg2 = g2.project(vertices={"person": ["id"]}, edges={"knows": ["dist"]})
for src in test_list:
res1 = graphscope.sssp(sg1, src=src, weight="dist")
res2 = graphscope.sssp(sg2, src=src, weight="dist")
df1 = res1.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values(
by=["id"], ignore_index=True
)
df2 = res2.to_dataframe(selector={"id": "v.id", "r": "r"}).sort_values(
by=["id"], ignore_index=True
)
if not df1.equals(df2):
pytest.raises(
AssertionError, "different sssp result got after extending graph data"
)

del g1, g2, sg1, sg2
siyuan0322 marked this conversation as resolved.
Show resolved Hide resolved
print("pass graph extending test")


def test_complicated_add_edges(graphscope_session):
prefix = os.path.expandvars("${GS_TEST_DIR}/modern_graph")
graph = graphscope_session.g()
Expand Down
Loading