Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Post-Aggregation] Support post-aggregation in ORDER-BY #5856

Merged
merged 1 commit into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,68 +19,27 @@
package org.apache.pinot.core.data.table;

import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;


/**
* Base abstract implementation of Table
*/
@SuppressWarnings("rawtypes")
public abstract class BaseTable implements Table {
protected final DataSchema _dataSchema;
protected final int _numColumns;
protected final AggregationFunction[] _aggregationFunctions;
protected final int _numAggregations;

// the capacity we need to trim to
protected int _capacity;
// the capacity with added buffer, in order to collect more records than capacity for better precision
protected int _maxCapacity;

protected boolean _isOrderBy;
protected TableResizer _tableResizer;

/**
* Initializes the variables and comparators needed for the table
*/
public BaseTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions,
@Nullable List<OrderByExpressionContext> orderByExpressions, int capacity) {
protected BaseTable(DataSchema dataSchema) {
_dataSchema = dataSchema;
_numColumns = dataSchema.size();
_aggregationFunctions = aggregationFunctions;
_numAggregations = aggregationFunctions.length;
addCapacityAndOrderByInfo(orderByExpressions, capacity);
}

protected void addCapacityAndOrderByInfo(@Nullable List<OrderByExpressionContext> orderByExpressions, int capacity) {
if (orderByExpressions != null) {
_isOrderBy = true;
_tableResizer = new TableResizer(_dataSchema, _aggregationFunctions, orderByExpressions);

// TODO: tune these numbers and come up with a better formula (github ISSUE-4801)
// Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ
if (capacity
<= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize
_maxCapacity = 1_000_000;
} else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize
_maxCapacity = (int) (capacity * 1.2);
}
} else {
_isOrderBy = false;
_tableResizer = null;
_maxCapacity = capacity;
}
_capacity = capacity;
}

@Override
public boolean merge(Table table) {
Iterator<Record> iterator = table.iterator();
while (iterator.hasNext()) {
// NOTE: For performance concern, does not check the return value of the upsert(). Override this method if
// upsert() can return false.
upsert(iterator.next());
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,24 +39,16 @@
public class ConcurrentIndexedTable extends IndexedTable {
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIndexedTable.class);

private ConcurrentMap<Key, Record> _lookupMap;
private ReentrantReadWriteLock _readWriteLock;
private final ConcurrentMap<Key, Record> _lookupMap;
private final ReentrantReadWriteLock _readWriteLock;
private Iterator<Record> _iterator;

private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
private final AtomicInteger _numResizes = new AtomicInteger();
private final AtomicLong _resizeTime = new AtomicLong();

/**
* Initializes the data structures needed for this Table
* @param dataSchema data schema of the record's keys and values
* @param aggregationFunctions aggregation functions for the record's values
* @param orderByExpressions list of {@link OrderByExpressionContext} defining the order by
* @param capacity the capacity of the table
*/
public ConcurrentIndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions,
@Nullable List<OrderByExpressionContext> orderByExpressions, int capacity) {
super(dataSchema, aggregationFunctions, orderByExpressions, capacity);
public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) {
super(dataSchema, queryContext, capacity);

_lookupMap = new ConcurrentHashMap<>();
_readWriteLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -105,7 +95,7 @@ public boolean upsert(Key key, Record newRecord) {

// resize if exceeds max capacity
if (_lookupMap.size() >= _maxCapacity) {
if (_isOrderBy) {
if (_hasOrderBy) {
// reached capacity, resize
_readWriteLock.writeLock().lock();
try {
Expand Down Expand Up @@ -165,7 +155,7 @@ private List<Record> resizeAndSort(int trimToSize) {
@Override
public void finish(boolean sort) {

if (_isOrderBy) {
if (_hasOrderBy) {

if (sort) {
List<Record> sortedRecords = resizeAndSort(_capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,64 @@

import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;


/**
* Base implementation of Map-based Table for indexed lookup
*/
@SuppressWarnings("rawtypes")
public abstract class IndexedTable extends BaseTable {
private final KeyExtractor _keyExtractor;
final int _numKeyColumns;
protected final int _numKeyColumns;
protected final AggregationFunction[] _aggregationFunctions;
protected final boolean _hasOrderBy;
protected final TableResizer _tableResizer;

/**
* Initializes the variables and comparators needed for the table
*/
IndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions,
@Nullable List<OrderByExpressionContext> orderByExpressions, int capacity) {
super(dataSchema, aggregationFunctions, orderByExpressions, capacity);
// The capacity we need to trim to
protected final int _capacity;
// The capacity with added buffer, in order to collect more records than capacity for better precision
protected final int _maxCapacity;

_numKeyColumns = dataSchema.size() - _numAggregations;
_keyExtractor = new KeyExtractor(_numKeyColumns);
}
protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) {
super(dataSchema);

@Override
public boolean upsert(Record newRecord) {
Key key = _keyExtractor.extractKey(newRecord);
return upsert(key, newRecord);
}
List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
assert groupByExpressions != null;
_numKeyColumns = groupByExpressions.size();

/**
* Extractor for key component of a Record
* The assumption is that the keys will always be before the aggregations in the Record. It is the caller's responsibility to ensure that.
* This will help us avoid index lookups, while extracting keys, and also while aggregating the values
*/
private static class KeyExtractor {
private int _keyIndexes;
_aggregationFunctions = queryContext.getAggregationFunctions();

KeyExtractor(int keyIndexes) {
_keyIndexes = keyIndexes;
}
List<OrderByExpressionContext> orderByExpressions = queryContext.getOrderByExpressions();
if (orderByExpressions != null) {
_hasOrderBy = true;
_tableResizer = new TableResizer(dataSchema, queryContext);
_capacity = capacity;

/**
* Returns the Key from the Record
*/
Key extractKey(Record record) {
Object[] keys = Arrays.copyOf(record.getValues(), _keyIndexes);
return new Key(keys);
// TODO: tune these numbers and come up with a better formula (github ISSUE-4801)
// Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ
if (capacity
<= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize
_maxCapacity = 1_000_000;
} else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize
_maxCapacity = (int) (capacity * 1.2);
}
} else {
_hasOrderBy = false;
_tableResizer = null;
_capacity = capacity;
_maxCapacity = capacity;
}
}

@Override
public boolean upsert(Record record) {
// NOTE: The record will always have key columns (group-by expressions) in the front. This is handled in
// AggregationGroupByOrderByOperator.
Object[] keyValues = Arrays.copyOf(record.getValues(), _numKeyColumns);
return upsert(new Key(keyValues), record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,23 +37,15 @@
public class SimpleIndexedTable extends IndexedTable {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIndexedTable.class);

private Map<Key, Record> _lookupMap;
private final Map<Key, Record> _lookupMap;
private Iterator<Record> _iterator;

private boolean _noMoreNewRecords = false;
private int _numResizes = 0;
private long _resizeTime = 0;

/**
* Initializes the data structures needed for this Table
* @param dataSchema data schema of the record's keys and values
* @param aggregationFunctions aggregation functions for the record's values
* @param orderByExpressions list of {@link OrderByExpressionContext} defining the order by
* @param capacity the capacity of the table
*/
public SimpleIndexedTable(DataSchema dataSchema, AggregationFunction[] aggregationFunctions,
@Nullable List<OrderByExpressionContext> orderByExpressions, int capacity) {
super(dataSchema, aggregationFunctions, orderByExpressions, capacity);
public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, int capacity) {
super(dataSchema, queryContext, capacity);

_lookupMap = new HashMap<>();
}
Expand Down Expand Up @@ -94,7 +84,7 @@ public boolean upsert(Key key, Record newRecord) {
});

if (_lookupMap.size() >= _maxCapacity) {
if (_isOrderBy) {
if (_hasOrderBy) {
// reached max capacity, resize
resize(_capacity);
} else {
Expand Down Expand Up @@ -147,7 +137,7 @@ public Iterator<Record> iterator() {
@Override
public void finish(boolean sort) {

if (_isOrderBy) {
if (_hasOrderBy) {

if (sort) {
List<Record> sortedRecords = resizeAndSort(_capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@
public interface Table {

/**
* Update the table with the given record
* Updates the table with the given record, returns {@code true} if the table can take more records, {@code false}
* otherwise.
*/
boolean upsert(Record record);

/**
* Update the table with the given record, indexed on Key
* Updates the table with the given record indexed on the given key, returns {@code true} if the table can take more
* records, {@code false} otherwise.
*/
boolean upsert(Key key, Record record);

/**
* Merge all records from given table
* Merge all records from the given table, returns {@code true} if the table can take more records, {@code false}
* otherwise.
*/
boolean merge(Table table);

Expand Down
Loading