Skip to content

Commit

Permalink
fix(interactive): Fix PathExpand related bugs (#3538)
Browse files Browse the repository at this point in the history
Support PathExpand with multiple edge triplet. Fix #3467
  • Loading branch information
zhanglei1949 committed Feb 6, 2024
1 parent 87c4c75 commit 3b0d736
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 30 deletions.
8 changes: 8 additions & 0 deletions flex/engines/hqps_db/core/null_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <limits>
#include <tuple>
#include "flex/engines/hqps_db/core/utils/hqps_utils.h"
#include "flex/engines/hqps_db/structures/path.h"

namespace gs {

Expand Down Expand Up @@ -45,6 +46,13 @@ struct NullRecordCreator<std::tuple<T...>> {
}
};

template <typename VID_T, typename LabelT>
struct NullRecordCreator<Path<VID_T, LabelT>> {
static inline Path<VID_T, LabelT> GetNull() {
return Path<VID_T, LabelT>::Null();
}
};

template <size_t Ind = 0, typename... T>
static inline bool IsNull(const std::tuple<T...>& tuple) {
if constexpr (Ind + 1 < sizeof...(T)) {
Expand Down
13 changes: 12 additions & 1 deletion flex/engines/hqps_db/core/operator/get_v.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ class GetVertex {
std::array<label_id_t, num_labels>& req_labels,
Filter<EXPRESSION, SELECTOR...>& filter) {
auto req_label_vec = array_to_vec(req_labels);
auto labels = set.GetLabels();
std::vector<label_id_t> labels = set.GetLabels(v_opt);

// remove duplicate from labels
std::sort(labels.begin(), labels.end());
labels.erase(std::unique(labels.begin(), labels.end()), labels.end());
Expand Down Expand Up @@ -260,6 +261,16 @@ class GetVertex {
return set.GetVertices(v_opt, filter.expr_, property_getters_array);
}

// get no prop v from path set.
template <size_t num_labels, typename EXPRESSION, typename... SELECTOR>
static auto GetNoPropVFromPathSetImpl(
const GRAPH_INTERFACE& graph, const PathSet<vertex_id_t, label_id_t>& set,
VOpt v_opt, std::array<label_id_t, num_labels>& req_labels,
Filter<EXPRESSION, SELECTOR...>& filter) {
auto req_label_vec = array_to_vec(req_labels);
return set.GetVertices(v_opt, req_label_vec);
}

// User-defined expression
// for vertex set with multiple labels, i.e. two_label or general vertex set.
// do project.
Expand Down
132 changes: 124 additions & 8 deletions flex/engines/hqps_db/core/operator/path_expand.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,100 @@ class PathExpand {
return std::make_pair(std::move(path_set), std::move(offsets));
}

// PathExpand Path with multiple edge triplet.
template <typename VERTEX_SET_T, typename LabelT, size_t get_v_num_labels,
typename EDGE_FILTER_T, typename VERTEX_FILTER_T>
static auto PathExpandP(
const GRAPH_INTERFACE& graph, const VERTEX_SET_T& vertex_set,
PathExpandVMultiTripletOpt<LabelT, EDGE_FILTER_T, get_v_num_labels,
VERTEX_FILTER_T>&& path_expand_opt) {
auto& range = path_expand_opt.range_;
auto& edge_expand_opt = path_expand_opt.edge_expand_opt_;
auto& get_v_opt = path_expand_opt.get_v_opt_;
auto& edge_triplets = edge_expand_opt.edge_label_triplets_;
auto& vertex_other_labels = get_v_opt.v_labels_;
auto vertex_other_labels_vec = array_to_vec(vertex_other_labels);

std::vector<std::vector<vertex_id_t>> other_vertices;
std::vector<std::vector<label_id_t>> other_labels_vec;
std::vector<std::vector<offset_t>> other_offsets;
auto& vertices_vec = vertex_set.GetVertices();
std::vector<label_id_t> src_label_id_vec;
std::vector<label_id_t> src_labels_set;
static_assert(VERTEX_SET_T::is_row_vertex_set ||
VERTEX_SET_T::is_two_label_set ||
VERTEX_SET_T::is_general_set,
"Unsupported vertex set type");
if constexpr (VERTEX_SET_T::is_row_vertex_set) {
auto src_label = vertex_set.GetLabel();
src_label_id_vec =
std::vector<label_id_t>(vertices_vec.size(), vertex_set.GetLabel());
src_labels_set = {src_label};
} else if constexpr (VERTEX_SET_T::is_two_label_set) {
auto src_label_vec = vertex_set.GetLabelVec();
src_labels_set = array_to_vec(vertex_set.GetLabels());
src_label_id_vec = label_key_vec_2_label_id_vec(src_label_vec);
} else {
src_labels_set = vertex_set.GetLabels();
src_label_id_vec = label_key_vec_2_label_id_vec(vertex_set.GetLabelVec());
}
std::tie(other_vertices, other_labels_vec, other_offsets) =
path_expandp_multi_triplet(graph, edge_triplets,
vertex_other_labels_vec,
edge_expand_opt.direction_, vertices_vec,
src_labels_set, src_label_id_vec, range);

// The path are stored in a compressed way, and we flat it.
std::vector<offset_t> res_offsets;
res_offsets.reserve(vertices_vec.size() + 1);
res_offsets.emplace_back(0);

std::vector<std::vector<Path<vid_t, LabelT>>> cur_path, next_path;
std::vector<std::vector<Path<vid_t, LabelT>>> prev_path;
// indexed by src_vid,
for (size_t i = 0; i < vertices_vec.size(); ++i) {
std::vector<Path<vid_t, LabelT>> tmp_path;
tmp_path.emplace_back(vertices_vec[i], src_label_id_vec[i]);
cur_path.emplace_back(tmp_path);
}
prev_path.resize(vertices_vec.size());

for (auto j = range.start_; j < range.limit_; ++j) {
next_path.clear();
auto& cur_offset_vec = other_offsets[j];
CHECK(cur_path.size() == vertices_vec.size());
next_path.resize(vertices_vec.size());

for (auto i = 0; i < vertices_vec.size(); ++i) {
auto& tmp_path = cur_path[i];
auto start = cur_offset_vec[i];
auto end = cur_offset_vec[i + 1];
for (auto k = start; k < end; ++k) {
auto& next_vid = other_vertices[j][k];
auto& label = other_labels_vec[j][k];
for (auto& path : tmp_path) {
path.EmplaceBack(next_vid, label);
next_path[i].emplace_back(path);
path.PopBack();
}
}
// push all next_path[i] to tmp_path
prev_path[i].insert(prev_path[i].end(), next_path[i].begin(),
next_path[i].end());
next_path[i].swap(cur_path[i]);
}
}

std::vector<Path<vid_t, LabelT>> res_path;
for (auto i = 0; i < vertices_vec.size(); ++i) {
auto& tmp_path = prev_path[i];
res_path.insert(res_path.end(), tmp_path.begin(), tmp_path.end());
res_offsets.emplace_back(res_path.size());
}
return std::make_pair(PathSet<vertex_id_t, label_id_t>(std::move(res_path)),
std::move(res_offsets));
}

// Path expand to vertices with columns.
// PathExpand to vertices with vertex properties also retreived
template <typename... V_SET_T, typename VERTEX_FILTER_T, typename LabelT,
Expand Down Expand Up @@ -741,21 +835,14 @@ class PathExpand {
// expand from vertices, with multiple edge triplets.
// The intermediate vertices can also have multiple labels, and expand with
// multiple edge triplet.
static auto path_expandv_multi_triplet(
static auto path_expandp_multi_triplet(
const GRAPH_INTERFACE& graph,
const std::vector<std::array<label_id_t, 3>>&
edge_label_triplets, // src, dst, edge
const std::vector<label_id_t>& get_v_labels, const Direction& direction,
const std::vector<vertex_id_t>& vertices_vec,
const std::vector<label_id_t>& src_labels_set,
const std::vector<label_id_t>& src_v_labels_vec, const Range& range) {
// (range, other_label_ind, vertices)
LOG(INFO) << "PathExpandV with multiple edge triplets: "
<< gs::to_string(edge_label_triplets)
<< ", direction: " << gs::to_string(direction)
<< ", vertices size: " << vertices_vec.size()
<< ", src_labels_set: " << gs::to_string(src_labels_set)
<< ", range: " << range.start_ << ", " << range.limit_;
std::vector<std::vector<vertex_id_t>> other_vertices;
std::vector<std::vector<label_id_t>> other_labels_vec;
std::vector<std::vector<offset_t>> other_offsets;
Expand Down Expand Up @@ -924,6 +1011,35 @@ class PathExpand {
tmp_cur_offset[other_offsets[cur_hop - 1][i]]);
}
}
return std::make_tuple(std::move(other_vertices),
std::move(other_labels_vec),
std::move(other_offsets));
}

static auto path_expandv_multi_triplet(
const GRAPH_INTERFACE& graph,
const std::vector<std::array<label_id_t, 3>>&
edge_label_triplets, // src, dst, edge
const std::vector<label_id_t>& get_v_labels, const Direction& direction,
const std::vector<vertex_id_t>& vertices_vec,
const std::vector<label_id_t>& src_labels_set,
const std::vector<label_id_t>& src_v_labels_vec, const Range& range) {
// (range, other_label_ind, vertices)
LOG(INFO) << "PathExpandV with multiple edge triplets: "
<< gs::to_string(edge_label_triplets)
<< ", direction: " << gs::to_string(direction)
<< ", vertices size: " << vertices_vec.size()
<< ", src_labels_set: " << gs::to_string(src_labels_set)
<< ", range: " << range.start_ << ", " << range.limit_;

std::vector<std::vector<vertex_id_t>> other_vertices;
std::vector<std::vector<label_id_t>> other_labels_vec;
std::vector<std::vector<offset_t>> other_offsets;

std::tie(other_vertices, other_labels_vec, other_offsets) =
path_expandp_multi_triplet(graph, edge_label_triplets, get_v_labels,
direction, vertices_vec, src_labels_set,
src_v_labels_vec, range);

// select vertices that are in range and are in vertex_other_labels.
std::vector<vertex_id_t> res_vertices;
Expand Down
27 changes: 18 additions & 9 deletions flex/engines/hqps_db/core/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,10 +926,11 @@ class SinkOp {
}

// sink for compressed path set
template <size_t Ind, size_t act_tag_id, typename VID_T, typename LabelT>
template <size_t Ind, size_t act_tag_id, typename PATH_SET_T,
typename std::enable_if_t<(PATH_SET_T::is_path_set)>* = nullptr>
static void sink_col_impl(const GRAPH_INTERFACE& graph,
results::CollectiveResults& results_vec,
const CompressedPathSet<VID_T, LabelT>& path_set,
const PATH_SET_T& path_set,
const std::vector<size_t>& repeat_offsets,
int32_t tag_id) {
if (repeat_offsets.empty()) {
Expand Down Expand Up @@ -981,16 +982,24 @@ class SinkOp {
}
}

// TODO(zhanglei): This is temporary solution for sink path to results.
// Out physical plan will only generate EndV option, so we can only sink the
// end vertex to results.
// If we sink all vertices to results, cypher driver seems failed to parse
// paths of different lengths or what:
// <Tried to construct a path that is not built like a path: even number of
// elements>
static void add_path_to_pb(const Path<vid_t, label_id_t>& path,
results::GraphPath& mutable_path) {
for (size_t i = 0; i <= path.length(); ++i) {
// path's length + 1 = number of nodes
vid_t vid;
label_id_t label;
std::tie(label, vid) = path.GetNode(i);
mutable_path.add_path()->mutable_vertex()->set_id(
encode_unique_vertex_id(label, vid));
if (path.length() <= 0) {
return;
}
vid_t vid;
label_id_t label;
std::tie(label, vid) = path.GetNode(path.length() - 1);
auto vertex = mutable_path.add_path()->mutable_vertex();
vertex->set_id(encode_unique_vertex_id(label, vid));
vertex->mutable_label()->set_id(label);
}

static vid_t encode_unique_vertex_id(label_id_t label_id, vid_t vid) {
Expand Down
26 changes: 26 additions & 0 deletions flex/engines/hqps_db/core/sync_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,32 @@ class SyncEngine : public BaseEngine {
// old context will be abandon here.
}

/// Expand to Path
template <AppendOpt opt, int alias_to_use, typename CTX_HEAD_T, int cur_alias,
int base_tag, typename... CTX_PREV, typename LabelT,
typename EDGE_FILTER_T, size_t get_v_num_labels,
typename VERTEX_FILTER_T>
static auto PathExpandP(
const GRAPH_INTERFACE& graph,
Context<CTX_HEAD_T, cur_alias, base_tag, CTX_PREV...>&& ctx,
PathExpandVMultiTripletOpt<LabelT, EDGE_FILTER_T, get_v_num_labels,
VERTEX_FILTER_T>&& path_expand_opt) {
if (path_expand_opt.path_opt_ != PathOpt::Arbitrary) {
LOG(FATAL) << "Only support Arbitrary path now";
}
if (path_expand_opt.result_opt_ != ResultOpt::EndV) {
LOG(FATAL) << "Only support EndV now";
}
auto& select_node = gs::Get<alias_to_use>(ctx);
auto pair = PathExpand<GRAPH_INTERFACE>::PathExpandP(
graph, select_node, std::move(path_expand_opt));

// create new context node, update offsets.
return ctx.template AddNode<opt>(std::move(pair.first),
std::move(pair.second), alias_to_use);
// old context will be abandon here.
}

// get no props, just filter
template <
AppendOpt opt, int alias_to_use, typename CTX_HEAD_T, int cur_alias,
Expand Down
59 changes: 59 additions & 0 deletions flex/engines/hqps_db/core/utils/keyed.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "flex/engines/hqps_db/structures/multi_vertex_set/keyed_row_vertex_set.h"
#include "flex/engines/hqps_db/structures/multi_vertex_set/row_vertex_set.h"
#include "flex/engines/hqps_db/structures/multi_vertex_set/two_label_vertex_set.h"
#include "flex/engines/hqps_db/structures/path.h"

namespace gs {

Expand Down Expand Up @@ -536,6 +537,34 @@ struct KeyedAggT<GI, FlatEdgeSet<VID_T, LabelT, EDATA_T>, AggFunc::COUNT,
}
};

template <typename GI, typename VID_T, typename LabelT, int tag_id>
struct KeyedAggT<GI, CompressedPathSet<VID_T, LabelT>, AggFunc::COUNT,
std::tuple<grape::EmptyType>,
std::integer_sequence<int32_t, tag_id>> {
using agg_res_t = Collection<size_t>;
using aggregate_res_builder_t = CountBuilder<tag_id>;

static aggregate_res_builder_t create_agg_builder(
const CompressedPathSet<VID_T, LabelT>& set, const GI& graph,
std::tuple<PropertySelector<grape::EmptyType>>& selectors) {
return CountBuilder<tag_id>();
}
};

template <typename GI, typename VID_T, typename LabelT, int tag_id>
struct KeyedAggT<GI, PathSet<VID_T, LabelT>, AggFunc::COUNT,
std::tuple<grape::EmptyType>,
std::integer_sequence<int32_t, tag_id>> {
using agg_res_t = Collection<size_t>;
using aggregate_res_builder_t = CountBuilder<tag_id>;

static aggregate_res_builder_t create_agg_builder(
const PathSet<VID_T, LabelT>& set, const GI& graph,
std::tuple<PropertySelector<grape::EmptyType>>& selectors) {
return CountBuilder<tag_id>();
}
};

// COUNT DISTINCT for EdgeSets.
template <typename GI, typename VID_T, typename LabelT, int tag_id>
struct KeyedAggT<GI, UnTypedEdgeSet<VID_T, LabelT, typename GI::sub_graph_t>,
Expand Down Expand Up @@ -585,6 +614,36 @@ struct KeyedAggT<GI, FlatEdgeSet<VID_T, LabelT, EDATA_T>,
}
};

template <typename GI, typename VID_T, typename LabelT, int tag_id>
struct KeyedAggT<GI, CompressedPathSet<VID_T, LabelT>, AggFunc::COUNT_DISTINCT,
std::tuple<grape::EmptyType>,
std::integer_sequence<int32_t, tag_id>> {
using agg_res_t = Collection<size_t>;
using path_set_t = CompressedPathSet<VID_T, LabelT>;
using aggregate_res_builder_t = DistinctCountBuilder<tag_id, path_set_t>;

static aggregate_res_builder_t create_agg_builder(
const path_set_t& set, const GI& graph,
std::tuple<PropertySelector<grape::EmptyType>>& selectors) {
return aggregate_res_builder_t(set);
}
};

template <typename GI, typename VID_T, typename LabelT, int tag_id>
struct KeyedAggT<GI, PathSet<VID_T, LabelT>, AggFunc::COUNT_DISTINCT,
std::tuple<grape::EmptyType>,
std::integer_sequence<int32_t, tag_id>> {
using agg_res_t = Collection<size_t>;
using path_set_t = PathSet<VID_T, LabelT>;
using aggregate_res_builder_t = DistinctCountBuilder<tag_id, path_set_t>;

static aggregate_res_builder_t create_agg_builder(
const path_set_t& set, const GI& graph,
std::tuple<PropertySelector<grape::EmptyType>>& selectors) {
return aggregate_res_builder_t(set);
}
};

template <typename GI, typename... SET_T, typename... PropSelectorT,
int... tag_ids>
struct KeyedAggMultiColT<GI, std::tuple<SET_T...>, AggFunc::COUNT_DISTINCT,
Expand Down
Loading

0 comments on commit 3b0d736

Please sign in to comment.