From 75c686fbf3eafc357ce9e8d85a3aa7b20dcf5c13 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Wed, 12 Aug 2020 18:16:30 -0700 Subject: [PATCH] Support post-aggregation in ORDER-BY --- .../pinot/core/data/table/BaseTable.java | 47 +- .../data/table/ConcurrentIndexedTable.java | 26 +- .../pinot/core/data/table/IndexedTable.java | 76 +-- .../core/data/table/SimpleIndexedTable.java | 22 +- .../apache/pinot/core/data/table/Table.java | 9 +- .../pinot/core/data/table/TableResizer.java | 234 ++++++--- .../GroupByOrderByCombineOperator.java | 4 +- .../query/reduce/GroupByDataTableReducer.java | 4 +- .../core/data/table/IndexedTableTest.java | 85 +--- .../core/data/table/TableResizerTest.java | 469 +++++++----------- .../tests/BaseClusterIntegrationTestSet.java | 6 + .../pinot/perf/BenchmarkCombineGroupBy.java | 14 +- .../pinot/perf/BenchmarkIndexedTable.java | 21 +- 13 files changed, 460 insertions(+), 557 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/BaseTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/BaseTable.java index d0f2f60ed90..319395a590f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/BaseTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/BaseTable.java @@ -19,68 +19,27 @@ package org.apache.pinot.core.data.table; import java.util.Iterator; -import java.util.List; -import javax.annotation.Nullable; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.request.context.OrderByExpressionContext; /** * Base abstract implementation of Table */ -@SuppressWarnings("rawtypes") public abstract class BaseTable implements Table { protected final DataSchema _dataSchema; protected final int _numColumns; - protected final AggregationFunction[] _aggregationFunctions; - protected final int _numAggregations; - // the capacity we need to trim to - protected int _capacity; - // the capacity with added buffer, in order to collect more records than capacity for better precision - protected int _maxCapacity; - - protected boolean _isOrderBy; - protected TableResizer _tableResizer; - - /** - * Initializes the variables and comparators needed for the table - */ - public BaseTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, - @Nullable List orderByExpressions, int capacity) { + protected BaseTable(DataSchema dataSchema) { _dataSchema = dataSchema; _numColumns = dataSchema.size(); - _aggregationFunctions = aggregationFunctions; - _numAggregations = aggregationFunctions.length; - addCapacityAndOrderByInfo(orderByExpressions, capacity); - } - - protected void addCapacityAndOrderByInfo(@Nullable List orderByExpressions, int capacity) { - if (orderByExpressions != null) { - _isOrderBy = true; - _tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, orderByExpressions); - - // TODO: tune these numbers and come up with a better formula (github ISSUE-4801) - // Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ - if (capacity - <= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize - _maxCapacity = 1_000_000; - } else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize - _maxCapacity = (int) (capacity * 1.2); - } - } else { - _isOrderBy = false; - _tableResizer = null; - _maxCapacity = capacity; - } - _capacity = capacity; } @Override public boolean merge(Table table) { Iterator iterator = table.iterator(); while (iterator.hasNext()) { + // NOTE: For performance concern, does not check the return value of the upsert(). Override this method if + // upsert() can return false. upsert(iterator.next()); } return true; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java index 0237b9c4e8f..2ff4ccd1ec8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java @@ -27,10 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.annotation.Nullable; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,24 +39,16 @@ public class ConcurrentIndexedTable extends IndexedTable { private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIndexedTable.class); - private ConcurrentMap _lookupMap; - private ReentrantReadWriteLock _readWriteLock; + private final ConcurrentMap _lookupMap; + private final ReentrantReadWriteLock _readWriteLock; private Iterator _iterator; - private AtomicBoolean _noMoreNewRecords = new AtomicBoolean(); + private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean(); private final AtomicInteger _numResizes = new AtomicInteger(); private final AtomicLong _resizeTime = new AtomicLong(); - /** - * Initializes the data structures needed for this Table - * @param dataSchema data schema of the record's keys and values - * @param aggregationFunctions aggregation functions for the record's values - * @param orderByExpressions list of {@link OrderByExpressionContext} defining the order by - * @param capacity the capacity of the table - */ - public ConcurrentIndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, - @Nullable List orderByExpressions, int capacity) { - super(dataSchema, aggregationFunctions, orderByExpressions, capacity); + public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) { + super(dataSchema, queryContext, capacity); _lookupMap = new ConcurrentHashMap<>(); _readWriteLock = new ReentrantReadWriteLock(); @@ -105,7 +95,7 @@ public boolean upsert(Key key, Record newRecord) { // resize if exceeds max capacity if (_lookupMap.size() >= _maxCapacity) { - if (_isOrderBy) { + if (_hasOrderBy) { // reached capacity, resize _readWriteLock.writeLock().lock(); try { @@ -165,7 +155,7 @@ private List resizeAndSort(int trimToSize) { @Override public void finish(boolean sort) { - if (_isOrderBy) { + if (_hasOrderBy) { if (sort) { List sortedRecords = resizeAndSort(_capacity); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index eccddb395b5..5867256db24 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -20,54 +20,64 @@ import java.util.Arrays; import java.util.List; -import javax.annotation.Nullable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; /** * Base implementation of Map-based Table for indexed lookup */ +@SuppressWarnings("rawtypes") public abstract class IndexedTable extends BaseTable { - private final KeyExtractor _keyExtractor; - final int _numKeyColumns; + protected final int _numKeyColumns; + protected final AggregationFunction[] _aggregationFunctions; + protected final boolean _hasOrderBy; + protected final TableResizer _tableResizer; - /** - * Initializes the variables and comparators needed for the table - */ - IndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, - @Nullable List orderByExpressions, int capacity) { - super(dataSchema, aggregationFunctions, orderByExpressions, capacity); + // The capacity we need to trim to + protected final int _capacity; + // The capacity with added buffer, in order to collect more records than capacity for better precision + protected final int _maxCapacity; - _numKeyColumns = dataSchema.size() - _numAggregations; - _keyExtractor = new KeyExtractor(_numKeyColumns); - } + protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) { + super(dataSchema); - @Override - public boolean upsert(Record newRecord) { - Key key = _keyExtractor.extractKey(newRecord); - return upsert(key, newRecord); - } + List groupByExpressions = queryContext.getGroupByExpressions(); + assert groupByExpressions != null; + _numKeyColumns = groupByExpressions.size(); - /** - * Extractor for key component of a Record - * The assumption is that the keys will always be before the aggregations in the Record. It is the caller's responsibility to ensure that. - * This will help us avoid index lookups, while extracting keys, and also while aggregating the values - */ - private static class KeyExtractor { - private int _keyIndexes; + _aggregationFunctions = queryContext.getAggregationFunctions(); - KeyExtractor(int keyIndexes) { - _keyIndexes = keyIndexes; - } + List orderByExpressions = queryContext.getOrderByExpressions(); + if (orderByExpressions != null) { + _hasOrderBy = true; + _tableResizer = new TableResizer(dataSchema, queryContext); + _capacity = capacity; - /** - * Returns the Key from the Record - */ - Key extractKey(Record record) { - Object[] keys = Arrays.copyOf(record.getValues(), _keyIndexes); - return new Key(keys); + // TODO: tune these numbers and come up with a better formula (github ISSUE-4801) + // Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ + if (capacity + <= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize + _maxCapacity = 1_000_000; + } else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize + _maxCapacity = (int) (capacity * 1.2); + } + } else { + _hasOrderBy = false; + _tableResizer = null; + _capacity = capacity; + _maxCapacity = capacity; } } + + @Override + public boolean upsert(Record record) { + // NOTE: The record will always have key columns (group-by expressions) in the front. This is handled in + // AggregationGroupByOrderByOperator. + Object[] keyValues = Arrays.copyOf(record.getValues(), _numKeyColumns); + return upsert(new Key(keyValues), record); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java index f59de50eb33..05b53594976 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java @@ -23,11 +23,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,23 +37,15 @@ public class SimpleIndexedTable extends IndexedTable { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIndexedTable.class); - private Map _lookupMap; + private final Map _lookupMap; private Iterator _iterator; private boolean _noMoreNewRecords = false; private int _numResizes = 0; private long _resizeTime = 0; - /** - * Initializes the data structures needed for this Table - * @param dataSchema data schema of the record's keys and values - * @param aggregationFunctions aggregation functions for the record's values - * @param orderByExpressions list of {@link OrderByExpressionContext} defining the order by - * @param capacity the capacity of the table - */ - public SimpleIndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, - @Nullable List orderByExpressions, int capacity) { - super(dataSchema, aggregationFunctions, orderByExpressions, capacity); + public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) { + super(dataSchema, queryContext, capacity); _lookupMap = new HashMap<>(); } @@ -94,7 +84,7 @@ public boolean upsert(Key key, Record newRecord) { }); if (_lookupMap.size() >= _maxCapacity) { - if (_isOrderBy) { + if (_hasOrderBy) { // reached max capacity, resize resize(_capacity); } else { @@ -147,7 +137,7 @@ public Iterator iterator() { @Override public void finish(boolean sort) { - if (_isOrderBy) { + if (_hasOrderBy) { if (sort) { List sortedRecords = resizeAndSort(_capacity); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java index c88d7f3ab43..d491b119f33 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java @@ -28,17 +28,20 @@ public interface Table { /** - * Update the table with the given record + * Updates the table with the given record, returns {@code true} if the table can take more records, {@code false} + * otherwise. */ boolean upsert(Record record); /** - * Update the table with the given record, indexed on Key + * Updates the table with the given record indexed on the given key, returns {@code true} if the table can take more + * records, {@code false} otherwise. */ boolean upsert(Key key, Record record); /** - * Merge all records from given table + * Merge all records from the given table, returns {@code true} if the table can take more records, {@code false} + * otherwise. */ boolean merge(Table table); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java index a8719184325..256f8bd1c4c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.data.table; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.ArrayList; import java.util.Arrays; @@ -28,10 +28,15 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import java.util.function.Function; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.postaggregation.PostAggregationFunction; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.utils.ByteArray; /** @@ -39,55 +44,45 @@ */ @SuppressWarnings({"rawtypes", "unchecked"}) public class TableResizer { + private final DataSchema _dataSchema; + private final int _numGroupByExpressions; + private final Map _groupByExpressionIndexMap; + private final AggregationFunction[] _aggregationFunctions; + private final Map _aggregationFunctionIndexMap; + private final int _numOrderByExpressions; private final OrderByValueExtractor[] _orderByValueExtractors; private final Comparator _intermediateRecordComparator; - private final int _numOrderByExpressions; - TableResizer(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, - List orderByExpressions) { + public TableResizer(DataSchema dataSchema, QueryContext queryContext) { + _dataSchema = dataSchema; - // NOTE: the assumption here is that the key columns will appear before the aggregation columns in the data schema - // This is handled in the only in the AggregationGroupByOrderByOperator for now + // NOTE: The data schema will always have group-by expressions in the front, followed by aggregation functions of + // the same order as in the query context. This is handled in AggregationGroupByOrderByOperator. - int numColumns = dataSchema.size(); - int numAggregations = aggregationFunctions.length; - int numKeyColumns = numColumns - numAggregations; - - Map columnIndexMap = new HashMap<>(); - Map aggregationColumnToFunction = new HashMap<>(); - for (int i = 0; i < numColumns; i++) { - String columnName = dataSchema.getColumnName(i); - columnIndexMap.put(columnName, i); - if (i >= numKeyColumns) { - aggregationColumnToFunction.put(columnName, aggregationFunctions[i - numKeyColumns]); - } + List groupByExpressions = queryContext.getGroupByExpressions(); + assert groupByExpressions != null; + _numGroupByExpressions = groupByExpressions.size(); + _groupByExpressionIndexMap = new HashMap<>(); + for (int i = 0; i < _numGroupByExpressions; i++) { + _groupByExpressionIndexMap.put(groupByExpressions.get(i), i); } + _aggregationFunctions = queryContext.getAggregationFunctions(); + assert _aggregationFunctions != null; + _aggregationFunctionIndexMap = queryContext.getAggregationFunctionIndexMap(); + assert _aggregationFunctionIndexMap != null; + + List orderByExpressions = queryContext.getOrderByExpressions(); + assert orderByExpressions != null; _numOrderByExpressions = orderByExpressions.size(); _orderByValueExtractors = new OrderByValueExtractor[_numOrderByExpressions]; Comparator[] comparators = new Comparator[_numOrderByExpressions]; - - for (int orderByIdx = 0; orderByIdx < _numOrderByExpressions; orderByIdx++) { - OrderByExpressionContext orderByExpression = orderByExpressions.get(orderByIdx); - String column = orderByExpression.getExpression().toString(); - - if (columnIndexMap.containsKey(column)) { - int index = columnIndexMap.get(column); - if (index < numKeyColumns) { - _orderByValueExtractors[orderByIdx] = new KeyColumnExtractor(index); - } else { - AggregationFunction aggregationFunction = aggregationColumnToFunction.get(column); - _orderByValueExtractors[orderByIdx] = new AggregationColumnExtractor(index, aggregationFunction); - } - } else { - throw new IllegalStateException("Could not find column " + column + " in data schema"); - } - - comparators[orderByIdx] = orderByExpression.isAsc() ? Comparator.naturalOrder() : Comparator.reverseOrder(); + for (int i = 0; i < _numOrderByExpressions; i++) { + OrderByExpressionContext orderByExpression = orderByExpressions.get(i); + _orderByValueExtractors[i] = getOrderByValueExtractor(orderByExpression.getExpression()); + comparators[i] = orderByExpression.isAsc() ? Comparator.naturalOrder() : Comparator.reverseOrder(); } - _intermediateRecordComparator = (o1, o2) -> { - for (int i = 0; i < _numOrderByExpressions; i++) { int result = comparators[i].compare(o1._values[i], o2._values[i]); if (result != 0) { @@ -98,14 +93,37 @@ public class TableResizer { }; } + /** + * Helper method to construct a OrderByValueExtractor based on the given expression. + */ + private OrderByValueExtractor getOrderByValueExtractor(ExpressionContext expression) { + if (expression.getType() == ExpressionContext.Type.LITERAL) { + return new LiteralExtractor(expression.getLiteral()); + } + Integer groupByExpressionIndex = _groupByExpressionIndexMap.get(expression); + if (groupByExpressionIndex != null) { + // Group-by expression + return new GroupByExpressionExtractor(groupByExpressionIndex); + } + FunctionContext function = expression.getFunction(); + Preconditions + .checkState(function != null, "Failed to find ORDER-BY expression: %s in the GROUP-BY clause", expression); + if (function.getType() == FunctionContext.Type.AGGREGATION) { + // Aggregation function + return new AggregationFunctionExtractor(_aggregationFunctionIndexMap.get(function)); + } else { + // Post-aggregation function + return new PostAggregationFunctionExtractor(function); + } + } + /** * Constructs an IntermediateRecord from Record * The IntermediateRecord::key is the same Record::key * The IntermediateRecord::values contains only the order by columns, in the query's sort sequence * For aggregation values in the order by, the final result is extracted if the intermediate result is non-comparable */ - @VisibleForTesting - IntermediateRecord getIntermediateRecord(Key key, Record record) { + private IntermediateRecord getIntermediateRecord(Key key, Record record) { Comparable[] intermediateRecordValues = new Comparable[_numOrderByExpressions]; for (int i = 0; i < _numOrderByExpressions; i++) { intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record); @@ -118,8 +136,7 @@ IntermediateRecord getIntermediateRecord(Key key, Record record) { * Resize only if number of records is greater than trimToSize * The resizer smartly chooses to create PQ of records to evict or records to retain, based on the number of records and the number of records to evict */ - void resizeRecordsMap(Map recordsMap, int trimToSize) { - + public void resizeRecordsMap(Map recordsMap, int trimToSize) { int numRecordsToEvict = recordsMap.size() - trimToSize; if (numRecordsToEvict > 0) { @@ -127,9 +144,8 @@ void resizeRecordsMap(Map recordsMap, int trimToSize) { if (numRecordsToEvict < trimToSize) { // num records to evict is smaller than num records to retain // make PQ of records to evict - Comparator comparator = _intermediateRecordComparator; PriorityQueue priorityQueue = - convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, comparator); + convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, _intermediateRecordComparator); for (IntermediateRecord evictRecord : priorityQueue) { recordsMap.remove(evictRecord._key); } @@ -188,8 +204,7 @@ private List sortRecordsMap(Map recordsMap) { * If numRecordsToEvict > numRecordsToRetain, resize with PQ of records to evict, and then sort * Else, resize with PQ of record to retain, then use the PQ to create sorted list */ - List resizeAndSortRecordsMap(Map recordsMap, int trimToSize) { - + public List resizeAndSortRecordsMap(Map recordsMap, int trimToSize) { int numRecords = recordsMap.size(); if (numRecords == 0) { return Collections.emptyList(); @@ -236,8 +251,7 @@ List resizeAndSortRecordsMap(Map recordsMap, int trimToSize * 3. Inside the values, the columns should be ordered by the order by sequence * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable */ - @VisibleForTesting - static class IntermediateRecord { + private static class IntermediateRecord { final Key _key; final Comparable[] _values; @@ -248,49 +262,127 @@ static class IntermediateRecord { } /** - * Extractor for order by value columns from Record + * Extractor for the order-by value from a Record. */ - private static abstract class OrderByValueExtractor { - abstract Comparable extract(Record record); + private interface OrderByValueExtractor { + + /** + * Returns the ColumnDataType of the value extracted. + */ + ColumnDataType getValueType(); + + /** + * Extracts the value from the given Record. + */ + Comparable extract(Record record); } /** - * Extractor for key column + * Extractor for a literal. */ - private static class KeyColumnExtractor extends OrderByValueExtractor { + private static class LiteralExtractor implements OrderByValueExtractor { + final String _literal; + + LiteralExtractor(String literal) { + _literal = literal; + } + + @Override + public ColumnDataType getValueType() { + return ColumnDataType.STRING; + } + + @Override + public String extract(Record record) { + return _literal; + } + } + + /** + * Extractor for a group-by expression. + */ + private class GroupByExpressionExtractor implements OrderByValueExtractor { final int _index; - KeyColumnExtractor(int index) { - _index = index; + GroupByExpressionExtractor(int groupByExpressionIndex) { + _index = groupByExpressionIndex; } @Override - Comparable extract(Record record) { - Object keyColumn = record.getValues()[_index]; - return (Comparable) keyColumn; + public ColumnDataType getValueType() { + return _dataSchema.getColumnDataType(_index); + } + + @Override + public Comparable extract(Record record) { + return (Comparable) record.getValues()[_index]; } } /** - * Extractor for aggregation column + * Extractor for an aggregation function. */ - private static class AggregationColumnExtractor extends OrderByValueExtractor { + private class AggregationFunctionExtractor implements OrderByValueExtractor { final int _index; - final Function _convertorFunction; + final AggregationFunction _aggregationFunction; - AggregationColumnExtractor(int index, AggregationFunction aggregationFunction) { - _index = index; - if (aggregationFunction.isIntermediateResultComparable()) { - _convertorFunction = o -> (Comparable) o; - } else { - _convertorFunction = o -> aggregationFunction.extractFinalResult(o); + AggregationFunctionExtractor(int aggregationFunctionIndex) { + _index = aggregationFunctionIndex + _numGroupByExpressions; + _aggregationFunction = _aggregationFunctions[aggregationFunctionIndex]; + } + + @Override + public ColumnDataType getValueType() { + return _aggregationFunction.getFinalResultColumnType(); + } + + @Override + public Comparable extract(Record record) { + return _aggregationFunction.extractFinalResult(record.getValues()[_index]); + } + } + + /** + * Extractor for a post-aggregation function. + */ + private class PostAggregationFunctionExtractor implements OrderByValueExtractor { + final Object[] _arguments; + final OrderByValueExtractor[] _argumentExtractors; + final PostAggregationFunction _postAggregationFunction; + + PostAggregationFunctionExtractor(FunctionContext function) { + assert function.getType() == FunctionContext.Type.TRANSFORM; + + List arguments = function.getArguments(); + int numArguments = arguments.size(); + _arguments = new Object[numArguments]; + _argumentExtractors = new OrderByValueExtractor[numArguments]; + ColumnDataType[] argumentTypes = new ColumnDataType[numArguments]; + for (int i = 0; i < numArguments; i++) { + OrderByValueExtractor argumentExtractor = getOrderByValueExtractor(arguments.get(i)); + _argumentExtractors[i] = argumentExtractor; + argumentTypes[i] = argumentExtractor.getValueType(); } + _postAggregationFunction = new PostAggregationFunction(function.getFunctionName(), argumentTypes); + } + + @Override + public ColumnDataType getValueType() { + return _postAggregationFunction.getResultType(); } @Override - Comparable extract(Record record) { - Object aggregationColumn = record.getValues()[_index]; - return _convertorFunction.apply(aggregationColumn); + public Comparable extract(Record record) { + int numArguments = _arguments.length; + for (int i = 0; i < numArguments; i++) { + _arguments[i] = _argumentExtractors[i].extract(record); + } + Object result = _postAggregationFunction.invoke(_arguments); + if (_postAggregationFunction.getResultType() == ColumnDataType.BYTES) { + return new ByteArray((byte[]) result); + } else { + return (Comparable) result; + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index ae7a6c99259..a2b3c76d057 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -140,9 +140,7 @@ public void runJob() { try { if (_dataSchema == null) { _dataSchema = intermediateResultsBlock.getDataSchema(); - _indexedTable = - new ConcurrentIndexedTable(_dataSchema, aggregationFunctions, _queryContext.getOrderByExpressions(), - _indexedTableCapacity); + _indexedTable = new ConcurrentIndexedTable(_dataSchema, _queryContext, _indexedTableCapacity); } } finally { _initLock.unlock(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index c6786ab5b98..e164745b2e8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -261,9 +261,7 @@ private DataSchema getSQLResultTableSchema(DataSchema dataSchema) { private IndexedTable getIndexedTable(DataSchema dataSchema, Collection dataTables) { int indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext); - IndexedTable indexedTable = - new ConcurrentIndexedTable(dataSchema, _aggregationFunctions, _queryContext.getOrderByExpressions(), - indexedTableCapacity); + IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, indexedTableCapacity); for (DataTable dataTable : dataTables) { BiFunction[] functions = new BiFunction[_numColumns]; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java index 233c0973433..166af6db84f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; @@ -32,11 +31,7 @@ import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.MaxAggregationFunction; -import org.apache.pinot.core.query.aggregation.function.SumAggregationFunction; -import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -52,13 +47,11 @@ public class IndexedTableTest { @Test public void testConcurrentIndexedTable() throws InterruptedException, TimeoutException, ExecutionException { + QueryContext queryContext = QueryContextConverterUtils + .getQueryContextFromSQL("SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3 ORDER BY SUM(m1)"); DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); - AggregationFunction[] aggregationFunctions = new AggregationFunction[]{new SumAggregationFunction( - ExpressionContext.forIdentifier("m1")), new MaxAggregationFunction(ExpressionContext.forIdentifier("m2"))}; - List orderByExpressions = Collections - .singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), true)); - IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationFunctions, orderByExpressions, 5); + IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5); // 3 threads upsert together // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times (20) @@ -121,71 +114,45 @@ public void testConcurrentIndexedTable() } @Test(dataProvider = "initDataProvider") - public void testNonConcurrentIndexedTable(List orderByExpressions, List survivors) { + public void testNonConcurrentIndexedTable(String orderBy, List survivors) { + QueryContext queryContext = QueryContextConverterUtils + .getQueryContextFromSQL("SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3, d4 ORDER BY " + orderBy); DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "d4", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); - AggregationFunction[] aggregationFunctions = new AggregationFunction[]{new SumAggregationFunction( - ExpressionContext.forIdentifier("m1")), new MaxAggregationFunction(ExpressionContext.forIdentifier("m2"))}; // Test SimpleIndexedTable - IndexedTable simpleIndexedTable = new SimpleIndexedTable(dataSchema, aggregationFunctions, orderByExpressions, 5); - // merge table - IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, aggregationFunctions, orderByExpressions, 10); - testNonConcurrent(simpleIndexedTable, mergeTable); - - // finish - simpleIndexedTable.finish(true); - checkSurvivors(simpleIndexedTable, survivors); + IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5); + IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10); + testNonConcurrent(indexedTable, mergeTable); + indexedTable.finish(true); + checkSurvivors(indexedTable, survivors); // Test ConcurrentIndexedTable - IndexedTable concurrentIndexedTable = - new ConcurrentIndexedTable(dataSchema, aggregationFunctions, orderByExpressions, 5); - mergeTable = new SimpleIndexedTable(dataSchema, aggregationFunctions, orderByExpressions, 10); - testNonConcurrent(concurrentIndexedTable, mergeTable); - - // finish - concurrentIndexedTable.finish(true); - checkSurvivors(concurrentIndexedTable, survivors); + indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5); + mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10); + testNonConcurrent(indexedTable, mergeTable); + indexedTable.finish(true); + checkSurvivors(indexedTable, survivors); } @DataProvider(name = "initDataProvider") public Object[][] initDataProvider() { List data = new ArrayList<>(); - List orderByExpressions; - List survivors; - // d1 desc - orderByExpressions = - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), false)); - survivors = Arrays.asList("m", "l", "k", "j", "i"); - data.add(new Object[]{orderByExpressions, survivors}); + data.add(new Object[]{"d1 DESC", Arrays.asList("m", "l", "k", "j", "i")}); // d1 asc - orderByExpressions = - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true)); - survivors = Arrays.asList("a", "b", "c", "d", "e"); - data.add(new Object[]{orderByExpressions, survivors}); + data.add(new Object[]{"d1", Arrays.asList("a", "b", "c", "d", "e")}); // sum(m1) desc, d1 asc - orderByExpressions = Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true)); - survivors = Arrays.asList("m", "h", "i", "a", "b"); - data.add(new Object[]{orderByExpressions, survivors}); + data.add(new Object[]{"SUM(m1) DESC, d1", Arrays.asList("m", "h", "i", "a", "b")}); // d2 desc - orderByExpressions = - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d2"), false)); - survivors = Arrays.asList("m", "l", "k", "j", "i"); - data.add(new Object[]{orderByExpressions, survivors}); + data.add(new Object[]{"d2 DESC", Arrays.asList("m", "l", "k", "j", "i")}); // d4 asc, d1 asc - orderByExpressions = Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d4"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true)); - survivors = Arrays.asList("a", "b", "c", "d", "e"); - data.add(new Object[]{orderByExpressions, survivors}); + data.add(new Object[]{"d4, d1 ASC", Arrays.asList("a", "b", "c", "d", "e")}); return data.toArray(new Object[data.size()][]); } @@ -271,15 +238,15 @@ private Record getRecord(Object[] columns) { @Test public void testNoMoreNewRecords() { + QueryContext queryContext = + QueryContextConverterUtils.getQueryContextFromSQL("SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3"); DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"}, new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); - AggregationFunction[] aggregationFunctions = new AggregationFunction[]{new SumAggregationFunction( - ExpressionContext.forIdentifier("m1")), new MaxAggregationFunction(ExpressionContext.forIdentifier("m2"))}; - IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, aggregationFunctions, null, 5); + IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, queryContext, 5); testNoMoreNewRecordsInTable(indexedTable); - indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationFunctions, null, 5); + indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5); testNoMoreNewRecordsInTable(indexedTable); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java index a3abedbed33..f0358a77006 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java @@ -18,205 +18,194 @@ */ package org.apache.pinot.core.data.table; -import com.google.common.collect.Lists; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.AvgAggregationFunction; -import org.apache.pinot.core.query.aggregation.function.DistinctCountAggregationFunction; -import org.apache.pinot.core.query.aggregation.function.MaxAggregationFunction; -import org.apache.pinot.core.query.aggregation.function.SumAggregationFunction; import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair; -import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.OrderByExpressionContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; -import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + /** * Tests the functionality of {@link @TableResizer} */ -@SuppressWarnings({"rawtypes", "unchecked"}) public class TableResizerTest { - private DataSchema _dataSchema; - private AggregationFunction[] _aggregationFunctions; + private static final String QUERY_PREFIX = + "SELECT SUM(m1), MAX(m2), DISTINCTCOUNT(m3), AVG(m4) FROM testTable GROUP BY d1, d2, d3 ORDER BY "; + private static final DataSchema DATA_SCHEMA = + new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT}); + private static final int TRIM_TO_SIZE = 3; - private int trimToSize = 3; private Map _recordsMap; private List _records; private List _keys; @BeforeClass public void setUp() { - _dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT}); - _aggregationFunctions = new AggregationFunction[]{new SumAggregationFunction( - ExpressionContext.forIdentifier("m1")), new MaxAggregationFunction( - ExpressionContext.forIdentifier("m2")), new DistinctCountAggregationFunction( - ExpressionContext.forIdentifier("m3")), new AvgAggregationFunction(ExpressionContext.forIdentifier("m4"))}; - - IntOpenHashSet i1 = new IntOpenHashSet(); - i1.add(1); - IntOpenHashSet i2 = new IntOpenHashSet(); - i2.add(1); - i2.add(2); - IntOpenHashSet i3 = new IntOpenHashSet(); - i3.add(1); - i3.add(2); - IntOpenHashSet i4 = new IntOpenHashSet(); - i4.add(1); - i4.add(2); - i4.add(3); - IntOpenHashSet i5 = new IntOpenHashSet(); - i5.add(1); - i5.add(2); - i5.add(3); - i5.add(4); - _records = Lists.newArrayList(new Record(new Object[]{"a", 10, 1.0, 10, 100, i1, new AvgPair(10, 2) /* 5 */}), - new Record(new Object[]{"b", 10, 2.0, 20, 200, i2, new AvgPair(10, 3) /* 3.33 */}), - new Record(new Object[]{"c", 200, 3.0, 30, 300, i3, new AvgPair(20, 4) /* 5 */}), - new Record(new Object[]{"c", 50, 4.0, 30, 200, i4, new AvgPair(30, 10) /* 3 */}), - new Record(new Object[]{"c", 300, 5.0, 20, 100, i5, new AvgPair(10, 5) /* 2 */})); - - _keys = Lists.newArrayList(new Key(new Object[]{"a", 10, 1.0}), new Key(new Object[]{"b", 10, 2.0}), - new Key(new Object[]{"c", 200, 3.0}), new Key(new Object[]{"c", 50, 4.0}), - new Key(new Object[]{"c", 300, 5.0})); + //@formatter:off + _records = Arrays.asList( + new Record(new Object[]{"a", 10, 1.0, 10.0, 100.0, new IntOpenHashSet(new int[]{1}), new AvgPair(10, 2) /* 5 */}), + new Record(new Object[]{"b", 10, 2.0, 20.0, 200.0, new IntOpenHashSet(new int[]{1, 2}), new AvgPair(10, 3) /* 3.33 */}), + new Record(new Object[]{"c", 200, 3.0, 30.0, 300.0, new IntOpenHashSet(new int[]{1, 2}), new AvgPair(20, 4) /* 5 */}), + new Record(new Object[]{"c", 50, 4.0, 30.0, 200.0, new IntOpenHashSet(new int[]{1, 2, 3}), new AvgPair(30, 10) /* 3 */}), + new Record(new Object[]{"c", 300, 5.0, 20.0, 100.0, new IntOpenHashSet(new int[]{1, 2, 3, 4}), new AvgPair(10, 5) /* 2 */}) + ); + _keys = Arrays.asList( + new Key(new Object[]{"a", 10, 1.0}), + new Key(new Object[]{"b", 10, 2.0}), + new Key(new Object[]{"c", 200, 3.0}), + new Key(new Object[]{"c", 50, 4.0}), + new Key(new Object[]{"c", 300, 5.0}) + ); + //@formatter:on _recordsMap = new HashMap<>(); - for (int i = 0; i < _records.size(); i++) { + int numRecords = _records.size(); + for (int i = 0; i < numRecords; i++) { _recordsMap.put(_keys.get(i), _records.get(i)); } - /*_recordsMap = new HashMap<>(); - _recordsMap.put(new Key(new Object[]{"a", 10, 1.0}), new Record(new Object[]{"a", 10, 1.0,10, 100, i1, new AvgPair(10, 2) *//* 5 *//*})); - _recordsMap.put(new Key(new Object[]{"b", 10, 2.0}), new Record(new Object[]{"b", 10, 2.0,20, 200, i2, new AvgPair(10, 3) *//* 3.33 *//*})); - _recordsMap.put(new Key(new Object[]{"c", 200, 3.0}),new Record(new Object[]{"c", 200, 3.0,30, 300, i3, new AvgPair(20, 4) *//* 5 *//*})); - _recordsMap.put(new Key(new Object[]{"c", 50, 4.0}), new Record(new Object[]{"c", 50, 4.0,30, 200, i4, new AvgPair(30, 10) *//* 3 *//*})); - _recordsMap.put(new Key(new Object[]{"c", 300, 5.0}), new Record(new Object[]{"c", 300, 5.0,20, 100, i5, new AvgPair(10, 5) *//* 2 *//*})); - */ } - /** - * {@link TableResizer} resizes the _records map based on SelectionSort - */ @Test public void testResizeRecordsMap() { - Map recordsMap; // Test resize algorithm with numRecordsToEvict < trimToSize. // TotalRecords=5; trimToSize=3; numRecordsToEvict=2 // d1 asc - recordsMap = new HashMap<>(_recordsMap); - TableResizer tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b, c - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); + TableResizer tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1")); + Map recordsMap = new HashMap<>(_recordsMap); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b + assertTrue(recordsMap.containsKey(_keys.get(1))); // d1 desc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), false))); + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1 DESC")); recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(2))); // c, c, c - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(2))); // c, c, c + assertTrue(recordsMap.containsKey(_keys.get(3))); + assertTrue(recordsMap.containsKey(_keys.get(4))); // d1 asc, d3 desc (tie breaking with 2nd comparator) - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d3"), false))); - recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(0))); // 10, 10, 300 - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); - - // d2 asc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d2"), true))); + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3 DESC")); recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(0))); // 10, 10, 50 - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b, c (300) + assertTrue(recordsMap.containsKey(_keys.get(1))); + assertTrue(recordsMap.containsKey(_keys.get(4))); // d1 asc, sum(m1) desc, max(m2) desc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("max(m2)"), false))); + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, SUM(m1) DESC, max(m2) DESC")); recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b, (c (30, 300)) - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(2))); - - // object type avg(m4) asc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Collections - .singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("avg(m4)"), true))); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b, c (30, 300) + assertTrue(recordsMap.containsKey(_keys.get(1))); + assertTrue(recordsMap.containsKey(_keys.get(2))); + + // avg(m4) asc (object type) + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)")); recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3, 3.33, - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); - - // non-comparable intermediate result - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("distinctcount(m3)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3, 3.33 + assertTrue(recordsMap.containsKey(_keys.get(3))); + assertTrue(recordsMap.containsKey(_keys.get(1))); + + // distinctcount(m3) desc, d1 asc (non-comparable intermediate result) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "DISTINCTCOUNT(m3) DESC, d1")); recordsMap = new HashMap<>(_recordsMap); - tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); // 6, 5, 4 (b) - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3, 2 (b) + assertTrue(recordsMap.containsKey(_keys.get(3))); + assertTrue(recordsMap.containsKey(_keys.get(1))); + + // d2 + d3 asc (post-aggregation) + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 + d3")); + recordsMap = new HashMap<>(_recordsMap); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(0))); // 11, 12, 54 + assertTrue(recordsMap.containsKey(_keys.get(1))); + assertTrue(recordsMap.containsKey(_keys.get(3))); + + // sum(m1) * d3 desc (post-aggregation) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) * d3 DESC")); + recordsMap = new HashMap<>(_recordsMap); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(3))); // 120, 100, 90 + assertTrue(recordsMap.containsKey(_keys.get(4))); + assertTrue(recordsMap.containsKey(_keys.get(2))); + + // d2 / (distinctcount(m3) + 1) asc, d1 desc (post-aggregation) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / (DISTINCTCOUNT(m3) + 1), d1 DESC")); + recordsMap = new HashMap<>(_recordsMap); + tableResizer.resizeRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(recordsMap.size(), TRIM_TO_SIZE); + assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5, 5 + assertTrue(recordsMap.containsKey(_keys.get(0))); + assertTrue(recordsMap.containsKey(_keys.get(3))); // Test resize algorithm with numRecordsToEvict > trimToSize. // TotalRecords=5; trimToSize=2; numRecordsToEvict=3 - trimToSize = 2; + int trimToSize = 2; // d1 asc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1")); recordsMap = new HashMap<>(_recordsMap); tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b - Assert.assertTrue(recordsMap.containsKey(_keys.get(1))); + assertEquals(recordsMap.size(), trimToSize); + assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b + assertTrue(recordsMap.containsKey(_keys.get(1))); - // object type avg(m4) asc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Collections - .singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("avg(m4)"), true))); + // avg(m4) asc (object type) + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)")); recordsMap = new HashMap<>(_recordsMap); tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3, 3.33, - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); + assertEquals(recordsMap.size(), trimToSize); + assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3 + assertTrue(recordsMap.containsKey(_keys.get(3))); - // non-comparable intermediate result - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("distinctcount(m3)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); + // distinctcount(m3) desc, d1 asc (non-comparable intermediate result) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "DISTINCTCOUNT(m3) DESC, d1")); recordsMap = new HashMap<>(_recordsMap); tableResizer.resizeRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(recordsMap.size(), trimToSize); - Assert.assertTrue(recordsMap.containsKey(_keys.get(4))); // 6, 5, 4 (b) - Assert.assertTrue(recordsMap.containsKey(_keys.get(3))); + assertEquals(recordsMap.size(), trimToSize); + assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3 + assertTrue(recordsMap.containsKey(_keys.get(3))); - // Reset trimToSize - trimToSize = 3; + // d2 / (distinctcount(m3) + 1) asc, d1 desc (post-aggregation) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / (DISTINCTCOUNT(m3) + 1), d1 DESC")); + recordsMap = new HashMap<>(_recordsMap); + tableResizer.resizeRecordsMap(recordsMap, trimToSize); + assertEquals(recordsMap.size(), trimToSize); + assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5 + assertTrue(recordsMap.containsKey(_keys.get(0))); } /** @@ -224,170 +213,80 @@ public void testResizeRecordsMap() { */ @Test public void testResizeAndSortRecordsMap() { - List sortedRecords; - int[] order; - Map recordsMap; - // d1 asc - TableResizer tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); - recordsMap = new HashMap<>(_recordsMap); - sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(sortedRecords.size(), trimToSize); - order = new int[]{0, 1}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + TableResizer tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1")); + Map recordsMap = new HashMap<>(_recordsMap); + List sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(0)); // a, b + assertEquals(sortedRecords.get(1), _records.get(1)); // d1 asc - trim to 1 recordsMap = new HashMap<>(_recordsMap); sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1); - Assert.assertEquals(sortedRecords.size(), 1); - order = new int[]{0}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + assertEquals(sortedRecords.get(0), _records.get(0)); // a // d1 asc, d3 desc (tie breaking with 2nd comparator) - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d3"), false))); + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3 DESC")); recordsMap = new HashMap<>(_recordsMap); - sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(sortedRecords.size(), trimToSize); - order = new int[]{0, 1, 4}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (300) + assertEquals(sortedRecords.get(1), _records.get(1)); + assertEquals(sortedRecords.get(2), _records.get(4)); - // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim 1 + // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim to 1 recordsMap = new HashMap<>(_recordsMap); sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1); - Assert.assertEquals(sortedRecords.size(), 1); - order = new int[]{0}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + assertEquals(sortedRecords.size(), 1); + assertEquals(sortedRecords.get(0), _records.get(0)); // a // d1 asc, sum(m1) desc, max(m2) desc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("max(m2)"), false))); + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, SUM(m1) DESC, max(m2) DESC")); recordsMap = new HashMap<>(_recordsMap); - sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize); - Assert.assertEquals(sortedRecords.size(), trimToSize); - order = new int[]{0, 1, 2}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (30, 300) + assertEquals(sortedRecords.get(1), _records.get(1)); + assertEquals(sortedRecords.get(2), _records.get(2)); - // trim 1 + // d1 asc, sum(m1) desc, max(m2) desc - trim to 1 recordsMap = new HashMap<>(_recordsMap); sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1); - Assert.assertEquals(sortedRecords.size(), 1); - order = new int[]{0}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } + assertEquals(sortedRecords.size(), 1); + assertEquals(sortedRecords.get(0), _records.get(0)); // a - // object type avg(m4) asc - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("avg(m4)"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true))); + // avg(m4) asc (object type) + tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)")); recordsMap = new HashMap<>(_recordsMap); - sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 10); // high trim to size - Assert.assertEquals(sortedRecords.size(), recordsMap.size()); - order = new int[]{4, 3, 1, 0, 2}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } - - // non-comparable intermediate result - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("distinctcount(m3)"), false), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("avg(m4)"), false))); + sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(4)); // 2, 3, 3.33 + assertEquals(sortedRecords.get(1), _records.get(3)); + assertEquals(sortedRecords.get(2), _records.get(1)); + + // distinctcount(m3) desc, d1 asc (non-comparable intermediate result) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "DISTINCTCOUNT(m3) DESC, d1")); recordsMap = new HashMap<>(_recordsMap); - sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, recordsMap.size()); // equal trim to size - Assert.assertEquals(sortedRecords.size(), recordsMap.size()); - order = new int[]{4, 3, 2, 1, 0}; - for (int i = 0; i < order.length; i++) { - Assert.assertEquals(sortedRecords.get(i), _records.get(order[i])); - } - } - - /** - * Tests the conversion of {@link Record} to {@link TableResizer.IntermediateRecord} - */ - @Test - public void testIntermediateRecord() { - - // d2 - TableResizer tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, - Collections.singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d2"), true))); - for (Map.Entry entry : _recordsMap.entrySet()) { - Key key = entry.getKey(); - Record record = entry.getValue(); - TableResizer.IntermediateRecord intermediateRecord = tableResizer.getIntermediateRecord(key, record); - Assert.assertEquals(intermediateRecord._key, key); - Assert.assertEquals(intermediateRecord._values.length, 1); - Assert.assertEquals(intermediateRecord._values[0], record.getValues()[1]); - } - - // sum(m1) - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Collections - .singletonList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), true))); - for (Map.Entry entry : _recordsMap.entrySet()) { - Key key = entry.getKey(); - Record record = entry.getValue(); - TableResizer.IntermediateRecord intermediateRecord = tableResizer.getIntermediateRecord(key, record); - Assert.assertEquals(intermediateRecord._key, key); - Assert.assertEquals(intermediateRecord._values.length, 1); - Assert.assertEquals(intermediateRecord._values[0], record.getValues()[3]); - } - - // d1, max(m2) - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d1"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("max(m2)"), true))); - for (Map.Entry entry : _recordsMap.entrySet()) { - Key key = entry.getKey(); - Record record = entry.getValue(); - TableResizer.IntermediateRecord intermediateRecord = tableResizer.getIntermediateRecord(key, record); - Assert.assertEquals(intermediateRecord._key, key); - Assert.assertEquals(intermediateRecord._values.length, 2); - Assert.assertEquals(intermediateRecord._values[0], record.getValues()[0]); - Assert.assertEquals(intermediateRecord._values[1], record.getValues()[4]); - } - - // d2, sum(m1), d3 - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Arrays - .asList(new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d2"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("sum(m1)"), true), - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("d3"), true))); - for (Map.Entry entry : _recordsMap.entrySet()) { - Key key = entry.getKey(); - Record record = entry.getValue(); - TableResizer.IntermediateRecord intermediateRecord = tableResizer.getIntermediateRecord(key, record); - Assert.assertEquals(intermediateRecord._key, key); - Assert.assertEquals(intermediateRecord._values.length, 3); - Assert.assertEquals(intermediateRecord._values[0], record.getValues()[1]); - Assert.assertEquals(intermediateRecord._values[1], record.getValues()[3]); - Assert.assertEquals(intermediateRecord._values[2], record.getValues()[2]); - } - - // non-comparable intermediate result - tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, Collections.singletonList( - new OrderByExpressionContext(QueryContextConverterUtils.getExpression("distinctcount(m3)"), true))); - AggregationFunction distinctCountFunction = _aggregationFunctions[2]; - for (Map.Entry entry : _recordsMap.entrySet()) { - Key key = entry.getKey(); - Record record = entry.getValue(); - TableResizer.IntermediateRecord intermediateRecord = tableResizer.getIntermediateRecord(key, record); - Assert.assertEquals(intermediateRecord._key, key); - Assert.assertEquals(intermediateRecord._values.length, 1); - Assert - .assertEquals(intermediateRecord._values[0], distinctCountFunction.extractFinalResult(record.getValues()[5])); - } + sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(4)); // 4, 3, 2 (b) + assertEquals(sortedRecords.get(1), _records.get(3)); + assertEquals(sortedRecords.get(2), _records.get(1)); + + // d2 / (distinctcount(m3) + 1) asc, d1 desc (post-aggregation) + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / (DISTINCTCOUNT(m3) + 1), d1 DESC")); + recordsMap = new HashMap<>(_recordsMap); + sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE); + assertEquals(sortedRecords.size(), TRIM_TO_SIZE); + assertEquals(sortedRecords.get(0), _records.get(1)); // 3.33, 12.5, 5 + assertEquals(sortedRecords.get(1), _records.get(0)); + assertEquals(sortedRecords.get(2), _records.get(3)); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 25cf9236137..d02e19d1285 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -225,6 +225,12 @@ public void testHardcodedSqlQueries() testSqlQuery(query, h2queries); query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS min FROM myTable"; testSqlQuery(query, Collections.singletonList(query)); + + // Post-aggregation in ORDER-BY + query = "SELECT MAX(ArrTime) FROM mytable GROUP BY DaysSinceEpoch ORDER BY MAX(ArrTime) - MIN(ArrTime)"; + testSqlQuery(query, Collections.singletonList(query)); + query = "SELECT MAX(ArrDelay), Month FROM mytable GROUP BY Month ORDER BY ABS(Month - 6) + MAX(ArrDelay)"; + testSqlQuery(query, Collections.singletonList(query)); } /** diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java index d7682823f28..65b32286fb9 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java @@ -65,13 +65,12 @@ @State(Scope.Benchmark) @Fork(value = 1, jvmArgs = {"-server", "-Xmx8G", "-XX:MaxDirectMemorySize=16G"}) public class BenchmarkCombineGroupBy { - private static final int TOP_N = 500; private static final int NUM_SEGMENTS = 4; private static final int NUM_RECORDS_PER_SEGMENT = 100_000; private static final int CARDINALITY_D1 = 500; private static final int CARDINALITY_D2 = 500; - private Random _random = new Random(); + private static final Random RANDOM = new Random(); private QueryContext _queryContext; private AggregationFunction[] _aggregationFunctions; @@ -115,15 +114,15 @@ public void destroy() { private Record getRecord() { Object[] columns = - new Object[]{_d1.get(_random.nextInt(_d1.size())), _d2.get(_random.nextInt(_d2.size())), (double) _random - .nextInt(1000), (double) _random.nextInt(1000)}; + new Object[]{_d1.get(RANDOM.nextInt(_d1.size())), _d2.get(RANDOM.nextInt(_d2.size())), (double) RANDOM + .nextInt(1000), (double) RANDOM.nextInt(1000)}; return new Record(columns); } private Pair getOriginalRecord() { String stringKey = Joiner.on(GroupKeyGenerator.DELIMITER) - .join(_d1.get(_random.nextInt(_d1.size())), _d2.get(_random.nextInt(_d2.size()))); - Object[] values = new Object[]{(double) _random.nextInt(1000), (double) _random.nextInt(1000)}; + .join(_d1.get(RANDOM.nextInt(_d1.size())), _d2.get(RANDOM.nextInt(_d2.size()))); + Object[] values = new Object[]{(double) RANDOM.nextInt(1000), (double) RANDOM.nextInt(1000)}; return new Pair<>(stringKey, values); } @@ -135,8 +134,7 @@ public void concurrentIndexedTableForCombineGroupBy() int capacity = GroupByUtils.getTableCapacity(_queryContext); // make 1 concurrent table - IndexedTable concurrentIndexedTable = - new ConcurrentIndexedTable(_dataSchema, _aggregationFunctions, _queryContext.getOrderByExpressions(), capacity); + IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(_dataSchema, _queryContext, capacity); List> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS); diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java index 3d9d0b77955..4100420ef89 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java @@ -37,7 +37,6 @@ import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.SimpleIndexedTable; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.util.trace.TraceRunnable; @@ -57,13 +56,11 @@ @State(Scope.Benchmark) public class BenchmarkIndexedTable { - - private int CAPACITY = 800; - private int NUM_RECORDS = 1000; - private Random _random = new Random(); + private static final int CAPACITY = 800; + private static final int NUM_RECORDS = 1000; + private static final Random RANDOM = new Random(); private QueryContext _queryContext; - private AggregationFunction[] _aggregationFunctions; private DataSchema _dataSchema; private List _d1; @@ -90,8 +87,6 @@ public void setup() { _queryContext = QueryContextConverterUtils .getQueryContextFromPQL("SELECT sum(m1), max(m2) FROM testTable GROUP BY d1, d2 ORDER BY sum(m1) TOP 500"); - _aggregationFunctions = _queryContext.getAggregationFunctions(); - assert _aggregationFunctions != null; _dataSchema = new DataSchema(new String[]{"d1", "d2", "sum(m1)", "max(m2)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); @@ -105,8 +100,8 @@ public void destroy() { private Record getNewRecord() { Object[] columns = - new Object[]{_d1.get(_random.nextInt(_d1.size())), _d2.get(_random.nextInt(_d2.size())), (double) _random - .nextInt(1000), (double) _random.nextInt(1000)}; + new Object[]{_d1.get(RANDOM.nextInt(_d1.size())), _d2.get(RANDOM.nextInt(_d2.size())), (double) RANDOM + .nextInt(1000), (double) RANDOM.nextInt(1000)}; return new Record(columns); } @@ -118,8 +113,7 @@ public void concurrentIndexedTable() int numSegments = 10; // make 1 concurrent table - IndexedTable concurrentIndexedTable = - new ConcurrentIndexedTable(_dataSchema, _aggregationFunctions, _queryContext.getOrderByExpressions(), CAPACITY); + IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(_dataSchema, _queryContext, CAPACITY); // 10 parallel threads putting 10k records into the table @@ -167,8 +161,7 @@ public void simpleIndexedTable() for (int i = 0; i < numSegments; i++) { // make 10 indexed tables - IndexedTable simpleIndexedTable = - new SimpleIndexedTable(_dataSchema, _aggregationFunctions, _queryContext.getOrderByExpressions(), CAPACITY); + IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, _queryContext, CAPACITY); simpleIndexedTables.add(simpleIndexedTable); // put 10k records in each indexed table, in parallel