Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the capacity of the DistinctTable #5204

Merged
merged 1 commit into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.table.BaseTable;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.spi.utils.BytesUtils;


/**
* This serves the following purposes:
*
Expand All @@ -47,21 +49,18 @@
* uses {@link Set} to store unique records.
*/
public class DistinctTable extends BaseTable {
private static final double LOAD_FACTOR = 0.75;
private static final int MAX_INITIAL_CAPACITY = 64 * 1024;
private Set<Record> _uniqueRecordsSet;
private boolean _noMoreNewRecords;
private Iterator<Record> _sortedIterator;

public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int limit) {
public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int capacity) {
// TODO: see if 64k is the right max initial capacity to use
// if it turns out that users always use LIMIT N > 0.75 * 64k and
// there are indeed that many records, then there will be resizes.
// The current method of setting the initial capacity as
// min(64k, limit/loadFactor) will not require resizes for LIMIT N
// where N <= 48000
super(dataSchema, Collections.emptyList(), orderBy, limit);
int initialCapacity = Math.min(MAX_INITIAL_CAPACITY, Math.abs(nextPowerOfTwo((int) (limit / LOAD_FACTOR))));
// NOTE: The passed in capacity is calculated based on the LIMIT in the query as Math.max(limit * 5, 5000). When
// LIMIT is smaller than (64 * 1024 * 0.75 (load factor) / 5 = 9830), then it is guaranteed that no resize is
// required.
super(dataSchema, Collections.emptyList(), orderBy, capacity);
int initialCapacity = Math.min(MAX_INITIAL_CAPACITY, HashUtil.getHashMapCapacity(capacity));
_uniqueRecordsSet = new HashSet<>(initialCapacity);
_noMoreNewRecords = false;
}
Expand Down Expand Up @@ -151,13 +150,15 @@ private void serializeColumns(final Object[] columns, final DataSchema.ColumnDat
* @param byteBuffer data to deserialize
* @throws IOException
*/
public DistinctTable(ByteBuffer byteBuffer) throws IOException {
public DistinctTable(ByteBuffer byteBuffer)
throws IOException {
// This is called by the BrokerReduceService when it de-serializes the
// DISTINCT result from the DataTable. As of now we don't have all the
// information to pass to super class so just pass null, empty lists
// and the broker will set the correct information before merging the
// data tables.
super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList(), new ArrayList<>(), 0);
super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList(), new ArrayList<>(),
0);
DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
_dataSchema = dataTable.getDataSchema();
_uniqueRecordsSet = new HashSet<>();
Expand Down Expand Up @@ -214,18 +215,6 @@ public void addLimitAndOrderByInfo(BrokerRequest brokerRequest) {
addCapacityAndOrderByInfo(brokerRequest.getOrderBy(), brokerRequest.getLimit());
}

private static int nextPowerOfTwo(int val) {
if (val == 0 || val == 1) {
return val + 1;
}
int highestBit = Integer.highestOneBit(val);
if (highestBit == val) {
return val;
} else {
return highestBit << 1;
}
}

private void resize(int trimToSize) {
_tableResizer.resizeRecordsSet(_uniqueRecordsSet, trimToSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.core.query.aggregation.DistinctTable;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;


Expand All @@ -43,15 +44,15 @@
public class DistinctAggregationFunction implements AggregationFunction<DistinctTable, Comparable> {
private final String[] _columns;
private final List<SelectionSort> _orderBy;
private final int _limit;
private final int _capacity;

public DistinctAggregationFunction(String multiColumnExpression, List<SelectionSort> orderBy, int limit) {
_columns = multiColumnExpression.split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
_orderBy = orderBy;
// use a multiplier for trim size when DISTINCT queries have ORDER BY. This logic
// is similar to what we have in GROUP BY with ORDER BY
// this does not guarantee 100% accuracy but still takes closer to it
_limit = CollectionUtils.isNotEmpty(_orderBy) ? limit * 5 : limit;
// NOTE: DISTINCT with order-by is similar to group-by with order-by, where we limit the maximum number of unique
// records (groups) for each query to reduce the memory footprint. The result might not be 100% accurate in
// certain scenarios, but should give a good enough approximation.
_capacity = CollectionUtils.isNotEmpty(_orderBy) ? GroupByUtils.getTableCapacity(limit) : limit;
}

@Override
Expand Down Expand Up @@ -82,7 +83,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
columnDataTypes[i] = ColumnDataType.fromDataTypeSV(blockValSets[i].getValueType());
}
DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
distinctTable = new DistinctTable(dataSchema, _orderBy, _limit);
distinctTable = new DistinctTable(dataSchema, _orderBy, _capacity);
aggregationResultHolder.setValue(distinctTable);
}

Expand Down Expand Up @@ -114,7 +115,7 @@ public DistinctTable extractAggregationResult(AggregationResultHolder aggregatio
ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
// NOTE: Use STRING for unknown type
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderBy, _limit);
return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderBy, _capacity);
}
}

Expand Down