Skip to content

Commit

Permalink
fix(interactive): Fix bugs When Counting on Multiple Columns (#3442)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
as titled.

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3410

---------

Co-authored-by: xiaolei.zl <xiaolei.zl@alibaba-inc.com>
Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
  • Loading branch information
3 people committed Dec 21, 2023
1 parent 74ab2d8 commit 0435bb8
Show file tree
Hide file tree
Showing 12 changed files with 577 additions and 50 deletions.
79 changes: 73 additions & 6 deletions flex/engines/hqps_db/core/operator/group_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,40 @@ struct GroupKeyResT<CTX_T, GroupKey<col_id, T>> {
using result_t = typename KeyedT<set_t, PropertySelector<T>>::keyed_set_t;
};

template <typename CTX_T, typename AGG_T>
template <typename CTX_T, typename AGG_T, typename Enable = void>
struct GroupValueResT;

// The SET_T could b a single set or a tuple of sets.
template <typename SET_T, AggFunc agg_func, typename SELECTOR_TUPLE>
struct GroupValueResTImpl;

template <typename CTX_T, AggFunc agg_func, typename... SELECTOR, int Is>
struct GroupValueResT<CTX_T, AggregateProp<agg_func, std::tuple<SELECTOR...>,
std::integer_sequence<int, Is>>> {
// Specialize for single set
template <typename CTX_T, AggFunc agg_func, typename... SELECTOR, int... Is>
struct GroupValueResT<CTX_T,
AggregateProp<agg_func, std::tuple<SELECTOR...>,
std::integer_sequence<int, Is...>>,
typename std::enable_if<(sizeof...(Is) == 1)>::type> {
using old_set_t = std::remove_const_t<std::remove_reference_t<decltype(
std::declval<CTX_T>().template GetNode<Is>())>>;
std::declval<CTX_T>().template GetNode<FirstElement<Is...>::value>())>>;
using result_t =
typename GroupValueResTImpl<old_set_t, agg_func,
std::tuple<SELECTOR...>>::result_t;
};

// Specialize for multiple sets
template <typename CTX_T, AggFunc agg_func, typename... SELECTOR, int... Is>
struct GroupValueResT<CTX_T,
AggregateProp<agg_func, std::tuple<SELECTOR...>,
std::integer_sequence<int, Is...>>,
typename std::enable_if<(sizeof...(Is) > 1)>::type> {
using old_set_tuple_t =
std::tuple<std::remove_const_t<std::remove_reference_t<decltype(
std::declval<CTX_T>().template GetNode<Is>())>>...>;
using result_t =
typename GroupValueResTImpl<old_set_tuple_t, agg_func,
std::tuple<SELECTOR...>>::result_t;
};

// specialization for count for single tag
// TODO: count for pairs.
template <typename SET_T>
Expand All @@ -92,6 +110,18 @@ struct GroupValueResTImpl<SET_T, AggFunc::COUNT_DISTINCT,
using result_t = Collection<size_t>;
};

// PropSelectorTuple doesn't effect the result type.
template <typename SET_TUPLE_T, typename PropSelectorTuple>
struct GroupValueResTImpl<SET_TUPLE_T, AggFunc::COUNT_DISTINCT,
PropSelectorTuple> {
using result_t = Collection<size_t>;
};

template <typename SET_TUPLE_T, typename PropSelectorTuple>
struct GroupValueResTImpl<SET_TUPLE_T, AggFunc::COUNT, PropSelectorTuple> {
using result_t = Collection<size_t>;
};

template <typename T>
struct GroupValueResTImpl<Collection<T>, AggFunc::SUM,
std::tuple<PropertySelector<grape::EmptyType>>> {
Expand Down Expand Up @@ -571,7 +601,7 @@ class GroupByOp {

template <typename... SET_T, typename HEAD_T, AggFunc _agg_func, typename T,
int tag_id>
static auto create_keyed_value_set_builder(
static auto create_keyed_value_set_builder_single_tag(
const GRAPH_INTERFACE& graph, const std::tuple<SET_T...>& tuple,
const HEAD_T& head,
AggregateProp<_agg_func, std::tuple<PropertySelector<T>>,
Expand All @@ -591,6 +621,43 @@ class GroupByOp {
}
}

// For aggregate on multiple tags, we currently only support count distinct
// and count.
template <typename... SET_T, typename HEAD_T, AggFunc _agg_func,
typename PROP_TUPLE_T, int... tag_id>
static auto create_keyed_value_set_builder_multi_tag(
const GRAPH_INTERFACE& graph, const std::tuple<SET_T...>& tuple,
const HEAD_T& head,
AggregateProp<_agg_func, PROP_TUPLE_T,
std::integer_sequence<int32_t, tag_id...>>& agg) {
// create const ref tuple from tuple and head using std::cref
// construct a const ref tuple from tuple
auto const_ref_tuple = make_tuple_of_const_refs(tuple);

auto old_set = std::tuple_cat(const_ref_tuple, std::tie(head));
// get the tuple from old_set, with tag_ids
auto old_set_tuple = std::tuple{gs::get_from_tuple<tag_id>(old_set)...};

return KeyedAggMultiColT<GRAPH_INTERFACE, decltype(old_set_tuple),
_agg_func, PROP_TUPLE_T,
std::integer_sequence<int32_t, tag_id...>>::
create_agg_builder(old_set_tuple, graph, agg.selectors_);
}

template <typename... SET_T, typename HEAD_T, AggFunc _agg_func,
typename PROP_SELECTOR_TUPLE, int... tag_ids>
static auto create_keyed_value_set_builder(
const GRAPH_INTERFACE& graph, const std::tuple<SET_T...>& tuple,
const HEAD_T& head,
AggregateProp<_agg_func, PROP_SELECTOR_TUPLE,
std::integer_sequence<int32_t, tag_ids...>>& agg) {
if constexpr (sizeof...(tag_ids) == 1) {
return create_keyed_value_set_builder_single_tag(graph, tuple, head, agg);
} else {
return create_keyed_value_set_builder_multi_tag(graph, tuple, head, agg);
}
}

// create builder for single key_alias
template <typename... SET_T, typename HEAD_T, int col_id, typename KEY_PROP>
static auto create_unkeyed_set_builder(
Expand Down
31 changes: 31 additions & 0 deletions flex/engines/hqps_db/core/utils/hqps_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,27 @@ inline auto make_offset_vector(size_t m, size_t n) {
return offsets;
}

template <int I, int... Is>
struct FirstElement {
static constexpr int value = I;
};

// Create a tuple of const references to the elements of a tuple.
template <typename... Args>
auto make_tuple_of_const_refs(const std::tuple<Args...>& t) {
return std::apply(
[](const Args&... args) { return std::make_tuple(std::cref(args)...); },
t);
}

template <typename T>
struct ConstRefRemoveHelper;

template <typename... T>
struct ConstRefRemoveHelper<std::tuple<T...>> {
using type = std::tuple<std::remove_const_t<std::remove_reference_t<T>>...>;
};

// first n ele in tuple type

template <int n, typename In, typename... Out>
Expand Down Expand Up @@ -429,6 +450,16 @@ constexpr auto tuple_slice(T&& t) {
std::make_index_sequence<r - l>{});
}

// [l, tuple_size - 1]
template <size_t l, typename T>
constexpr auto tuple_slice(T&& t) {
static_assert(std::tuple_size<std::decay_t<T>>::value > l,
"slice index out of bounds");
return tuple_slice_impl<l>(
std::forward<T>(t),
std::make_index_sequence<std::tuple_size<std::decay_t<T>>::value - l>{});
}

template <int Is, typename... T,
typename std::enable_if<(Is >= 0)>::type* = nullptr>
inline auto get_from_tuple(std::tuple<T...>& tuple) {
Expand Down
43 changes: 42 additions & 1 deletion flex/engines/hqps_db/core/utils/keyed.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,21 @@ struct KeyedT<Collection<T>, PropertySelector<grape::EmptyType>> {
// when keyed with aggregation function, (which we currently only support
// collection)

/// @brief Helper to get keyed set type with aggregation fnc
/// @brief Helper to get keyed set type with aggregation func
/// @tparam T
/// @tparam ValueT Keyed prop type
template <typename GI, typename T, AggFunc agg_func, typename Props,
typename Tags>
struct KeyedAggT;

/// @brief Helper to get keyed set type with aggregation func, which is applied
/// on multiple column
/// @tparam T
/// @tparam ValueT Keyed prop type
template <typename GI, typename SET_TUPLE_T, AggFunc agg_func, typename Props,
typename Tags>
struct KeyedAggMultiColT;

template <typename GI, typename LabelT, typename VID_T, typename... T,
typename PropT, int tag_id>
struct KeyedAggT<GI, RowVertexSet<LabelT, VID_T, T...>, AggFunc::COUNT,
Expand Down Expand Up @@ -465,6 +473,39 @@ struct KeyedAggT<GI, FlatEdgeSet<VID_T, LabelT, EDATA_T>,
}
};

template <typename GI, typename... SET_T, typename... PropSelectorT,
int... tag_ids>
struct KeyedAggMultiColT<GI, std::tuple<SET_T...>, AggFunc::COUNT_DISTINCT,
std::tuple<PropSelectorT...>,
std::integer_sequence<int32_t, tag_ids...>> {
using agg_res_t = Collection<size_t>;
// get the tuple of sets from the tuple of tags.
using aggregate_res_builder_t =
MultiColDistinctCountBuilder<std::tuple<SET_T...>, tag_ids...>;

static aggregate_res_builder_t create_agg_builder(
const std::tuple<SET_T...>& set, const GI& graph,
std::tuple<PropSelectorT...>& selectors) {
return aggregate_res_builder_t();
}
};

template <typename GI, typename... SET_T, typename... PropSelectorT,
int... tag_ids>
struct KeyedAggMultiColT<GI, std::tuple<SET_T...>, AggFunc::COUNT,
std::tuple<PropSelectorT...>,
std::integer_sequence<int32_t, tag_ids...>> {
using agg_res_t = Collection<size_t>;
// get the tuple of sets from the tuple of tags.
using aggregate_res_builder_t = MultiColCountBuilder<tag_ids...>;

static aggregate_res_builder_t create_agg_builder(
const std::tuple<SET_T...>& set, const GI& graph,
std::tuple<PropSelectorT...>& selectors) {
return aggregate_res_builder_t();
}
};

template <typename LabelT, typename KEY_T, typename VID_T, typename... T,
typename ELE, typename DATA>
static inline auto insert_into_builder_v2_impl(
Expand Down
76 changes: 76 additions & 0 deletions flex/engines/hqps_db/structures/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define ENGINES_HQPS_DS_COLLECTION_H_

#include <tuple>
#include <unordered_set>
#include <vector>

#include "flex/engines/hqps_db/core/null_record.h"
Expand Down Expand Up @@ -427,6 +428,39 @@ class CountBuilder {
std::vector<size_t> vec_;
};

template <int... tag>
class MultiColCountBuilder {
public:
MultiColCountBuilder() {}

// insert tuple at index ind.
// if the ele_value equal to invalid_value, then do not insert.
template <typename ELE_TUPLE, typename DATA_TUPLE>
void insert(size_t ind, const ELE_TUPLE& tuple, const DATA_TUPLE& data) {
auto cur_ele_tuple =
std::tuple_cat(tuple_slice<1>(gs::get_from_tuple<tag>(tuple))...);
while (vec_.size() <= ind) {
vec_.emplace_back(0);
}
using cur_ele_tuple_t =
std::remove_const_t<std::remove_reference_t<decltype(cur_ele_tuple)>>;
// remove the const and reference for each type in cur_ele_tuple_t
using cur_ele_tuple_rm_const_ref_t =
typename ConstRefRemoveHelper<cur_ele_tuple_t>::type;
if (cur_ele_tuple !=
NullRecordCreator<cur_ele_tuple_rm_const_ref_t>::GetNull()) {
++vec_[ind];
} else {
VLOG(10) << "ele is null";
}
}

Collection<size_t> Build() { return Collection<size_t>(std::move(vec_)); }

private:
std::vector<size_t> vec_;
};

template <int tag_id, typename T, typename Enable = void>
class DistinctCountBuilder;

Expand Down Expand Up @@ -581,6 +615,48 @@ class DistinctCountBuilder<tag_id, TwoLabelVertexSetImpl<VID_T, LabelT, T...>> {
std::array<VID_T, 2> min_v, max_v, range_size;
};

// DistinctCountBuilder for multiple sets together
template <typename SET_TUPLE_T, int... TAG_IDs>
class MultiColDistinctCountBuilder;

template <typename... SET_Ts, int... TAG_IDs>
class MultiColDistinctCountBuilder<std::tuple<SET_Ts...>, TAG_IDs...> {
public:
using set_ele_t = std::tuple<typename SET_Ts::element_t...>;
MultiColDistinctCountBuilder() {}

template <typename ELE_TUPLE_T, typename DATA_TUPLE>
void insert(size_t ind, const ELE_TUPLE_T& tuple, const DATA_TUPLE& data) {
// construct the ref tuple from tuple,with TAG_IDS,
// get element tuple from index_ele_tuple_t
auto cur_ele_tuple =
std::tuple_cat(tuple_slice<1>(gs::get_from_tuple<TAG_IDs>(tuple))...);

while (vec_of_set_.size() <= ind) {
vec_of_set_.emplace_back(
std::unordered_set<set_ele_t, boost::hash<set_ele_t>>());
}
auto& cur_set = vec_of_set_[ind];
cur_set.insert(cur_ele_tuple);
VLOG(10) << "tuple: " << gs::to_string(cur_ele_tuple)
<< ",all ele: " << gs::to_string(tuple) << "insert at ind: " << ind
<< ", res: " << cur_set.size();
}

Collection<size_t> Build() {
std::vector<size_t> res;
res.reserve(vec_of_set_.size());
for (auto& set : vec_of_set_) {
res.emplace_back(set.size());
}
return Collection<size_t>(std::move(res));
}

private:
std::vector<std::unordered_set<set_ele_t, boost::hash<set_ele_t>>>
vec_of_set_;
};

template <typename T, int tag_id>
class SumBuilder {
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alibaba.graphscope.common.ir.type.GraphSchemaType;
import com.alibaba.graphscope.common.jna.IrCoreLibrary;
import com.alibaba.graphscope.common.jna.type.*;
import com.alibaba.graphscope.gaia.proto.Common;
import com.alibaba.graphscope.gaia.proto.GraphAlgebra;
import com.alibaba.graphscope.gaia.proto.OuterExpression;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -365,32 +366,41 @@ public PhysicalNode visit(GraphLogicalAggregate aggregate) {
if (operands.isEmpty()) {
throw new IllegalArgumentException(
"operands in aggregate call should not be empty");
} else if (operands.size() > 1) {
throw new UnsupportedOperationException(
"aggregate on multiple variables is unsupported yet");
}
FfiAggOpt ffiAggOpt = Utils.ffiAggOpt(groupCalls.get(i));
int aliasId = fields.get(i + groupKeys.size()).getIndex();
FfiAlias.ByValue ffiAlias =
Common.NameOrId alias =
(aliasId == AliasInference.DEFAULT_ID)
? ArgUtils.asNoneAlias()
: ArgUtils.asAlias(aliasId);
Preconditions.checkArgument(
operands.get(0) instanceof RexGraphVariable,
"each expression in aggregate call should be type %s, but is %s",
RexGraphVariable.class,
operands.get(0).getClass());
OuterExpression.Variable var =
operands.get(0)
.accept(new RexToProtoConverter(true, isColumnId, this.rexBuilder))
.getOperators(0)
.getVar();
? Common.NameOrId.newBuilder().build()
: Common.NameOrId.newBuilder().setId(aliasId).build();
List<OuterExpression.Variable> vars =
operands.stream()
.map(
k -> {
Preconditions.checkArgument(
k instanceof RexGraphVariable,
"each operand in aggregate call should be type %s,"
+ " but is %s",
RexGraphVariable.class,
k.getClass());
return k.accept(
new RexToProtoConverter(
true, isColumnId, this.rexBuilder))
.getOperators(0)
.getVar();
})
.collect(Collectors.toList());
checkFfiResult(
LIB.addGroupbyAggFnPb(
ptrGroup,
new FfiPbPointer.ByValue(var.toByteArray()),
ffiAggOpt,
ffiAlias));
new FfiPbPointer.ByValue(
GraphAlgebra.GroupBy.AggFunc.newBuilder()
.addAllVars(vars)
.setAggregate(
com.alibaba.graphscope.common.ir.runtime.proto
.Utils.protoAggFn(groupCalls.get(i)))
.setAlias(alias)
.build()
.toByteArray())));
}
com.alibaba.graphscope.common.ir.runtime.proto.Utils.protoRowType(
aggregate.getRowType(), isColumnId)
Expand Down
Loading

0 comments on commit 0435bb8

Please sign in to comment.