Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate index support in Cascades #1864

Merged
merged 45 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
447930a
* WIP.
hatyo Sep 13, 2022
3cb497b
* experimentation with matching.
hatyo Sep 20, 2022
1ed9e53
* first version works: matching aggregate index as value index.
hatyo Sep 21, 2022
e487e73
* WIP.
hatyo Sep 22, 2022
8ea4153
* adjustments.
hatyo Sep 22, 2022
7ac45b3
* improve match candidate generation.
hatyo Sep 26, 2022
cf76d8d
* refactoring, more tests.
hatyo Sep 26, 2022
abb70d1
* minor modifications.
hatyo Sep 26, 2022
e9f7ebd
* WIP.
hatyo Sep 29, 2022
b22dc3b
* fixes.
hatyo Sep 30, 2022
39e6e5f
* end to end tests are working.
hatyo Oct 12, 2022
59d45e4
* refactoring + documentation.
hatyo Oct 12, 2022
819fba5
* refactoring + documentation, tests passing again.
hatyo Oct 12, 2022
3ada69f
* further refactoring.
hatyo Oct 12, 2022
a4394bb
* fix rebasing conflicts.
hatyo Oct 12, 2022
79d3031
* clean up.
hatyo Oct 12, 2022
18c792a
* exclude support for count(*) (for now).
hatyo Oct 12, 2022
c23c482
* WIP.
hatyo Oct 13, 2022
a58feea
* tests are passing.
hatyo Oct 14, 2022
189965d
Merge remote-tracking branch 'upstream/main' into aggregate_index_sup…
hatyo Oct 19, 2022
1206df0
* fix indirect merge errors.
hatyo Oct 19, 2022
4ae2fba
* modifications, add ordinal path to FieldValue.
hatyo Oct 25, 2022
d59a608
* add subsumedBy method to Value.
hatyo Oct 25, 2022
b01d6de
* restore broken tests.
hatyo Oct 26, 2022
2e76952
* adapt hash codes.
hatyo Oct 26, 2022
1c0e93f
* address checkstyle violations.
hatyo Oct 26, 2022
b4ea9a3
* address PMD violations.
hatyo Oct 26, 2022
b54097a
* address SonarCloud reported bug.
hatyo Oct 26, 2022
e229b20
* address comments.
hatyo Oct 31, 2022
2f6ad2d
* address comments (2).
hatyo Oct 31, 2022
c96c740
* reference grouping column by ordinal not by name.
hatyo Oct 31, 2022
ba1fae8
Merge remote-tracking branch 'upstream/main' into aggregate_index_sup…
hatyo Oct 31, 2022
d471134
* fix merge skew.
hatyo Oct 31, 2022
17f23d5
* fix checkstyle violations.
hatyo Oct 31, 2022
ba20239
* fix equalsWithoutChildren in AggregateValues.
hatyo Oct 31, 2022
8cb3147
* fix PMD violations.
hatyo Nov 1, 2022
f3e4343
* address comments.
hatyo Nov 3, 2022
bb0c946
* cleanup.
hatyo Nov 3, 2022
5ad81fb
* fix a small bug in MinEverLong instantiation.
hatyo Nov 3, 2022
3ed2994
* address comments (2).
hatyo Nov 3, 2022
849b37a
* factor out Field usage and only rely on FieldPath.
hatyo Nov 7, 2022
4835cea
* fix failing tests.
hatyo Nov 7, 2022
41d8fa3
* code style fixes, review comments.
hatyo Nov 7, 2022
6ec847a
* address more comments.
hatyo Nov 7, 2022
a8081a2
* minor fix in comments.
hatyo Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The Record Layer is a Java API providing a record-oriented store on top of Found
across one or more record types, and a query planner capable of
automatic selection of indexes.
* **Many record stores, shared schema** - The Record Layer provides the
the ability to support many discrete record store instances, all with
ability to support many discrete record store instances, all with
a shared (and evolving) schema. For example, rather than modeling a
single database in which to store all users' data, each user can be
given their own record store, perhaps sharded across different FDB
Expand Down
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support planning aggregate indexes in Cascades. [(Issue #1885)](https://github.com/FoundationDB/fdb-record-layer/issues/1885)
* **Breaking change** Change 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -690,9 +692,12 @@ public static Map<String, Descriptors.FieldDescriptor> getFieldDescriptorMapFrom

@Nonnull
private static Map<String, Descriptors.FieldDescriptor> getFieldDescriptorMap(@Nonnull final Stream<RecordType> recordTypeStream) {
// todo: should be removed https://github.com/FoundationDB/fdb-record-layer/issues/1884
return recordTypeStream
.sorted(Comparator.comparing(RecordType::getName))
.flatMap(recordType -> recordType.getDescriptor().getFields().stream())
.collect(Collectors.groupingBy(Descriptors.FieldDescriptor::getName,
LinkedHashMap::new,
normen662 marked this conversation as resolved.
Show resolved Hide resolved
normen662 marked this conversation as resolved.
Show resolved Hide resolved
Collectors.reducing(null,
(fieldDescriptor, fieldDescriptor2) -> {
Verify.verify(fieldDescriptor != null || fieldDescriptor2 != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ public String getName() {
return name;
}

/**
* Returns the type of the index.
*
* @return the type of the index.
* @see IndexTypes
*/
@Nonnull
public String getType() {
return type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* AggregateIndexExpansionVisitor.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.query.plan.cascades;

import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.metadata.IndexTypes;
import com.apple.foundationdb.record.metadata.RecordType;
import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression;
import com.apple.foundationdb.record.metadata.expressions.KeyExpression;
import com.apple.foundationdb.record.query.plan.cascades.expressions.GroupByExpression;
import com.apple.foundationdb.record.query.plan.cascades.expressions.MatchableSortExpression;
import com.apple.foundationdb.record.query.plan.cascades.expressions.SelectExpression;
import com.apple.foundationdb.record.query.plan.cascades.predicates.ValueComparisonRangePredicate;
import com.apple.foundationdb.record.query.plan.cascades.typing.Type;
import com.apple.foundationdb.record.query.plan.cascades.typing.TypeRepository;
import com.apple.foundationdb.record.query.plan.cascades.values.AggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.CountValue;
import com.apple.foundationdb.record.query.plan.cascades.values.FieldValue;
import com.apple.foundationdb.record.query.plan.cascades.values.IndexOnlyAggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.NumericAggregationValue;
import com.apple.foundationdb.record.query.plan.cascades.values.QuantifiedObjectValue;
import com.apple.foundationdb.record.query.plan.cascades.values.RecordConstructorValue;
import com.apple.foundationdb.record.query.plan.cascades.values.Value;
import com.apple.foundationdb.record.query.plan.cascades.values.Values;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Expands an aggregate index into a {@link MatchCandidate}. The generation will expand a {@link KeyExpression} into a
* group by triplet QGM comprising a select-where, group-by-expression, and select-having, the triplet is followed
* by an optional {@link MatchableSortExpression} that defines the sort order of the match candidate stream of records.
*/
public class AggregateIndexExpansionVisitor extends KeyExpressionExpansionVisitor
implements ExpansionVisitor<KeyExpressionExpansionVisitor.VisitorState> {

@Nonnull
private static final Supplier<Set<String>> allowedIndexTypes = Suppliers.memoize(AggregateIndexExpansionVisitor::computeAllowedIndexesMap);

@Nonnull
private static final Supplier<Map<String, BuiltInFunction<? extends Value>>> aggregateMap = Suppliers.memoize(AggregateIndexExpansionVisitor::computeAggregateMap);

@Nonnull
private final Index index;

@Nonnull
private final Collection<RecordType> recordTypes;

@Nonnull
private final GroupingKeyExpression groupingKeyExpression;

/**
* Constructs a new instance of {@link AggregateIndexExpansionVisitor}.
*
* @param index The target index.
* @param recordTypes The indexed record types.
*/
public AggregateIndexExpansionVisitor(@Nonnull final Index index, @Nonnull final Collection<RecordType> recordTypes) {
Preconditions.checkArgument(allowedIndexTypes.get().contains(index.getType()));
Preconditions.checkArgument(index.getRootExpression() instanceof GroupingKeyExpression);
this.index = index;
this.groupingKeyExpression = ((GroupingKeyExpression)index.getRootExpression());
this.recordTypes = recordTypes;
}

/**
* Creates a new match candidate representing the aggregate index.
*
* @param baseQuantifierSupplier a quantifier supplier to create base data access
* @param ignored the primary key of the data object the caller wants to access, this parameter is ignored since
* an aggregate index does not possess primary key information, must be {@code null}.
* @param isReverse an indicator whether the result set is expected to be returned in reverse order.
* @return A match candidate representing the aggregate index.
*/
@Nonnull
@Override
public MatchCandidate expand(@Nonnull final java.util.function.Supplier<Quantifier.ForEach> baseQuantifierSupplier,
@Nullable final KeyExpression ignored,
final boolean isReverse) {
Verify.verify(ignored == null);
final var baseQuantifier = baseQuantifierSupplier.get();
final var groupingAndGroupedCols = Value.fromKeyExpressions(groupingKeyExpression.normalizeKeyForPositions(), baseQuantifier.getAlias(), baseQuantifier.getFlowedObjectType());
final var groupingValues = groupingAndGroupedCols.subList(0, groupingKeyExpression.getGroupingCount());
final var groupedValues = groupingAndGroupedCols.subList(groupingKeyExpression.getGroupingCount(), groupingAndGroupedCols.size());

if (groupedValues.size() > 1) {
throw new UnsupportedOperationException(String.format("aggregate index is expected to contain exactly one aggregation, however it contains %d aggregations", groupedValues.size()));
}

// 1. create a SELECT-WHERE expression.
final var selectWhereQun = constructSelectWhere(baseQuantifier, groupingValues);

// 2. create a GROUP-BY expression on top.
final var groupByQun = constructGroupBy(baseQuantifier.getAlias(), groupedValues, selectWhereQun);

// 3. construct SELECT-HAVING with SORT on top.
final var selectHavingAndPlaceholderAliases = constructSelectHaving(groupByQun);
final var selectHaving = selectHavingAndPlaceholderAliases.getLeft();
final var placeHolderAliases = selectHavingAndPlaceholderAliases.getRight();

// 4. add sort on top, if necessary, this will be absorbed later on as an ordering property of the match candidate.
final var maybeWithSort = placeHolderAliases.isEmpty()
? GroupExpressionRef.of(selectHaving) // single group, sort by constant
: GroupExpressionRef.of(new MatchableSortExpression(placeHolderAliases, isReverse, selectHaving));

final var traversal = ExpressionRefTraversal.withRoot(maybeWithSort);
return new AggregateIndexMatchCandidate(index,
traversal,
placeHolderAliases,
recordTypes,
baseQuantifier.getFlowedObjectType(),
groupByQun.getRangesOver().get().getResultValue(),
selectHaving.getResultValue());
}

@Nonnull
private Quantifier constructSelectWhere(@Nonnull final Quantifier.ForEach baseQuantifier, final List<? extends Value> groupingValues) {
final var allExpansionsBuilder = ImmutableList.<GraphExpansion>builder();
allExpansionsBuilder.add(GraphExpansion.ofQuantifier(baseQuantifier));

// add the SELECT-WHERE part, where we expose grouping and grouped columns, allowing query fragments that governs
// only these columns to properly bind to this part, similar to how value indices work.
final var keyValues = Lists.<Value>newArrayList();
final var valueValues = Lists.<Value>newArrayList();
final var state = VisitorState.of(keyValues, valueValues, baseQuantifier, ImmutableList.of(), 0, 0);
final var selectWhereGraphExpansion = pop(groupingKeyExpression.getWholeKey().expand(push(state)));

// add an RCV column representing the grouping columns as the first result set column
final var groupingValue = RecordConstructorValue.ofColumns(groupingValues
.stream()
.map(Column::unnamedOf) // REMOVE: name is important?
.collect(Collectors.toList()));

// flow all underlying quantifiers in their own QOV columns.
final var builder = GraphExpansion.builder();
// we need to refer to the following colum later on in GroupByExpression, but since its ordinal position is fixed, we can simply refer
// to it using an ordinal FieldAccessor (we do the same in plan generation).
builder.addResultColumn(Column.unnamedOf(groupingValue));
Stream.concat(Stream.of(baseQuantifier), selectWhereGraphExpansion.getQuantifiers().stream())
.forEach(qun -> {
final var quantifiedValue = QuantifiedObjectValue.of(qun.getAlias(), qun.getFlowedObjectType());
builder.addResultColumn(Column.of(Type.Record.Field.of( quantifiedValue.getResultType(), Optional.of(qun.getAlias().getId())), quantifiedValue));
});
builder.addAllPlaceholders(selectWhereGraphExpansion.getPlaceholders());
builder.addAllPredicates(selectWhereGraphExpansion.getPredicates());
builder.addAllQuantifiers(selectWhereGraphExpansion.getQuantifiers());
allExpansionsBuilder.add(builder.build());

return Quantifier.forEach(GroupExpressionRef.of(GraphExpansion.ofOthers(allExpansionsBuilder.build()).buildSelect()));
}

@SuppressWarnings("deprecation")
@Nonnull
private Quantifier constructGroupBy(@Nonnull final CorrelationIdentifier baseQuantifierCorrelationIdentifier,
@Nonnull final List<? extends Value> groupedValue,
@Nonnull final Quantifier selectWhereQun) {
// construct aggregation RCV
final int[] cnt = {0};
final var aggregateValue = RecordConstructorValue.ofColumns(groupedValue.stream().map(gv -> {
final var prefixedFieldPath = Stream.concat(Stream.of(baseQuantifierCorrelationIdentifier.getId()), ((FieldValue)gv).getFieldPathNames().stream()).collect(Collectors.toList());
final var groupedFieldReference = FieldValue.ofFieldNames(selectWhereQun.getFlowedObjectValue(), prefixedFieldPath);
return (AggregateValue)aggregateMap.get().get(index.getType()).encapsulate(TypeRepository.newBuilder(), List.of(groupedFieldReference));
}).map(av -> Column.of(Type.Record.Field.of(av.getResultType(), Optional.of(generateAggregateFieldName(cnt[0]++))), av)).collect(Collectors.toList()));

// construct grouping column(s) value, the grouping column is _always_ fixed at position-0 in the underlying select-where.
final var groupingColsValue = FieldValue.ofOrdinalNumber(selectWhereQun.getFlowedObjectValue(), 0);

if (groupingColsValue.getResultType() instanceof Type.Record && ((Type.Record)groupingColsValue.getResultType()).getFields().isEmpty()) {
return Quantifier.forEach(GroupExpressionRef.of(new GroupByExpression(aggregateValue, null, selectWhereQun)));
} else {
return Quantifier.forEach(GroupExpressionRef.of(new GroupByExpression(aggregateValue, groupingColsValue, selectWhereQun)));
}
}

@Nonnull
private String generateAggregateFieldName(int fieldIdx) {
return index.getName() + "_" + index.getType() + "_agg_" + fieldIdx;
}

@Nonnull
private Pair<SelectExpression, List<CorrelationIdentifier>> constructSelectHaving(@Nonnull final Quantifier groupByQun) {
// the grouping value in GroupByExpression comes first (if set).
@Nullable final var groupingValueReference =
(groupByQun.getRangesOver().get() instanceof GroupByExpression && ((GroupByExpression)groupByQun.getRangesOver().get()).getGroupingValue() == null)
? null
: FieldValue.ofOrdinalNumber(groupByQun.getFlowedObjectValue(), 0);

final var aggregateValueReference = FieldValue.ofOrdinalNumber(FieldValue.ofOrdinalNumber(groupByQun.getFlowedObjectValue(), groupingValueReference == null ? 0 : 1), 0);

final var placeholderAliases = ImmutableList.<CorrelationIdentifier>builder();
final var selectHavingGraphExpansionBuilder = GraphExpansion.builder().addQuantifier(groupByQun);
if (groupingValueReference != null) {
Values.deconstructRecord(groupingValueReference).forEach(v -> {
final var field = (FieldValue)v;
final var placeholder = v.asPlaceholder(CorrelationIdentifier.uniqueID(ValueComparisonRangePredicate.Placeholder.class));
placeholderAliases.add(placeholder.getAlias());
selectHavingGraphExpansionBuilder
.addResultColumn(Column.unnamedOf(field))
.addPlaceholder(placeholder)
.addPredicate(placeholder);
});
}
selectHavingGraphExpansionBuilder.addResultColumn(Column.unnamedOf(aggregateValueReference)); // TODO should we also add the aggregate reference as a placeholder? // REMOVE: name is important?
return Pair.of(selectHavingGraphExpansionBuilder.build().buildSelect(), placeholderAliases.build());
}

@Nonnull
private static Set<String> computeAllowedIndexesMap() {
final ImmutableSet.Builder<String> setBuilder = ImmutableSet.builder();
setBuilder.add(IndexTypes.COUNT);
setBuilder.add(IndexTypes.SUM);
setBuilder.add(IndexTypes.MIN_EVER_LONG);
setBuilder.add(IndexTypes.MAX_EVER_LONG);
return setBuilder.build();
}

@Nonnull
private static Map<String, BuiltInFunction<? extends Value>> computeAggregateMap() {
final ImmutableMap.Builder<String, BuiltInFunction<? extends Value>> mapBuilder = ImmutableMap.builder();
mapBuilder.put(IndexTypes.MAX_EVER_LONG, new IndexOnlyAggregateValue.MaxEverLongFn());
mapBuilder.put(IndexTypes.MIN_EVER_LONG, new IndexOnlyAggregateValue.MinEverLongFn());
mapBuilder.put(IndexTypes.SUM, new NumericAggregationValue.SumFn());
mapBuilder.put(IndexTypes.COUNT, new CountValue.CountFn());
return mapBuilder.build();
}
}