Skip to content
Permalink
Browse files
GroupBy: Cap dictionary-building selector memory usage. (#12309)
* GroupBy: Cap dictionary-building selector memory usage.

New context parameter "maxSelectorDictionarySize" controls when the
per-segment processing code should return early and trigger a trip
to the merge buffer.

Includes:

- Vectorized and nonvectorized implementations.
- Adjustments to GroupByQueryRunnerTest to exercise this code in
  the v2SmallDictionary suite. (Both the selector dictionary and
  the merging dictionary will be small in that suite.)
- Tests for the new config parameter.

* Fix issues from tests.

* Add "pre-existing" to dictionary.

* Simplify GroupByColumnSelectorStrategy interface by removing one of the writeToKeyBuffer methods.

* Adjustments from review comments.
  • Loading branch information
gianm committed Mar 8, 2022
1 parent baea3ec commit 875e0696e01c4348fa31c77ec6fa333a324a53d8
Show file tree
Hide file tree
Showing 34 changed files with 601 additions and 227 deletions.
@@ -2047,13 +2047,15 @@ Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|

Supported query contexts:

|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

@@ -275,13 +275,18 @@ as the index, so the aggregated values in the array can be accessed directly wit

### Memory tuning and resource limits

When using groupBy v2, three parameters control resource usage and limits:
When using groupBy v2, four parameters control resource usage and limits:

- `druid.processing.buffer.sizeBytes`: size of the off-heap hash table used for aggregation, per query, in bytes. At
most `druid.processing.numMergeBuffers` of these will be created at once, which also serves as an upper limit on the
number of concurrently running groupBy queries.
- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap dictionary used when grouping on strings, per query,
in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size.
- `druid.query.groupBy.maxSelectorDictionarySize`: size of the on-heap segment-level dictionary used when grouping on
string or array-valued expressions that do not have pre-existing dictionaries. There is at most one dictionary per
processing thread; therefore there are up to `druid.processing.numThreads` of these. Note that the size is based on a
rough estimate of the dictionary footprint.
- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap query-level dictionary used when grouping on
any string expression. There is at most one dictionary per concurrently-running query; therefore there are up to
`druid.server.http.numThreads` of these. Note that the size is based on a rough estimate of the dictionary footprint.
- `druid.query.groupBy.maxOnDiskStorage`: amount of space on disk used for aggregation, per query, in bytes. By default,
this is 0, which means aggregation will not use disk.

@@ -381,13 +386,15 @@ Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|

Supported query contexts:

|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

@@ -24,6 +24,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;

/**
*
*/
public class GroupByQueryConfig
{
@@ -42,6 +43,7 @@
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
@@ -69,6 +71,11 @@
@JsonProperty
private int bufferGrouperInitialBuckets = 0;

@JsonProperty
// Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be
// emitted to the merge buffer early.
private long maxSelectorDictionarySize = 100_000_000L;

@JsonProperty
// Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk
private long maxMergingDictionarySize = 100_000_000L;
@@ -151,6 +158,11 @@ public int getBufferGrouperInitialBuckets()
return bufferGrouperInitialBuckets;
}

public long getMaxSelectorDictionarySize()
{
return maxSelectorDictionarySize;
}

public long getMaxMergingDictionarySize()
{
return maxMergingDictionarySize;
@@ -230,6 +242,13 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage()
);
newConfig.maxSelectorDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE,
getMaxSelectorDictionarySize()
)).longValue(),
getMaxSelectorDictionarySize()
);
newConfig.maxMergingDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_MERGING_DICTIONARY_SIZE,
@@ -243,7 +262,10 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
isApplyLimitPushDownToSegment()
);
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(
CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY,
isForcePushDownNestedQuery()
);
newConfig.intermediateCombineDegree = query.getContextValue(
CTX_KEY_INTERMEDIATE_COMBINE_DEGREE,
getIntermediateCombineDegree()
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby.epinephelinae;

import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.segment.DimensionDictionary;

import java.util.ArrayList;
import java.util.List;

/**
* Utilities for parts of the groupBy engine that need to build dictionaries.
*/
public class DictionaryBuilding
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;

/**
* Creates a forward dictionary (dictionary ID -> value).
*/
public static <T> List<T> createDictionary()
{
return new ArrayList<>();
}

/**
* Creates a reverse dictionary (value -> dictionary ID). If a value is not present in the reverse dictionary,
* {@link Object2IntMap#getInt} will return {@link DimensionDictionary#ABSENT_VALUE_ID}.
*/
public static <T> Object2IntMap<T> createReverseDictionary()
{
final Object2IntOpenHashMap<T> m = new Object2IntOpenHashMap<>();
m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
return m;
}

/**
* Estimated footprint of a new entry.
*/
public static int estimateEntryFootprint(final int valueFootprint)
{
return valueFootprint + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
}
}
@@ -623,7 +623,10 @@ private static class HashAggregateIterator extends GroupByEngineIterator<ByteBuf
private final ByteBuffer keyBuffer;

private int stackPointer = Integer.MIN_VALUE;
protected boolean currentRowWasPartiallyAggregated = false;
private boolean currentRowWasPartiallyAggregated = false;

// Sum of internal state footprint across all "dims".
private long selectorInternalFootprint = 0;

public HashAggregateIterator(
GroupByQuery query,
@@ -717,12 +720,19 @@ protected Grouper<ByteBuffer> newGrouper()
@Override
protected void aggregateSingleValueDims(Grouper<ByteBuffer> grouper)
{
if (!currentRowWasPartiallyAggregated) {
for (GroupByColumnSelectorPlus dim : dims) {
dim.getColumnSelectorStrategy().reset();
}
selectorInternalFootprint = 0;
}

while (!cursor.isDone()) {
for (GroupByColumnSelectorPlus dim : dims) {
final GroupByColumnSelectorStrategy strategy = dim.getColumnSelectorStrategy();
strategy.writeToKeyBuffer(
selectorInternalFootprint += strategy.writeToKeyBuffer(
dim.getKeyBufferPosition(),
strategy.getOnlyValue(dim.getSelector()),
dim.getSelector(),
keyBuffer
);
}
@@ -731,21 +741,35 @@ protected void aggregateSingleValueDims(Grouper<ByteBuffer> grouper)
if (!grouper.aggregate(keyBuffer).isOk()) {
return;
}

cursor.advance();

// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
return;
}
}
}

@Override
protected void aggregateMultiValueDims(Grouper<ByteBuffer> grouper)
{
if (!currentRowWasPartiallyAggregated) {
for (GroupByColumnSelectorPlus dim : dims) {
dim.getColumnSelectorStrategy().reset();
}
selectorInternalFootprint = 0;
}

while (!cursor.isDone()) {
if (!currentRowWasPartiallyAggregated) {
// Set up stack, valuess, and first grouping in keyBuffer for this row
stackPointer = stack.length - 1;

for (int i = 0; i < dims.length; i++) {
GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
strategy.initColumnValues(
selectorInternalFootprint += strategy.initColumnValues(
dims[i].getSelector(),
i,
valuess
@@ -808,6 +832,12 @@ protected void aggregateMultiValueDims(Grouper<ByteBuffer> grouper)
// Advance to next row
cursor.advance();
currentRowWasPartiallyAggregated = false;

// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
return;
}
}
}

@@ -882,6 +912,9 @@ protected void aggregateMultiValueDims(Grouper<Integer> grouper)

private void aggregateSingleValueDims(IntGrouper grouper)
{
// No need to track strategy internal state footprint, because array-based grouping does not use strategies.
// It accesses dimension selectors directly and only works on truly dictionary-coded columns.

while (!cursor.isDone()) {
final int key;
if (dim != null) {
@@ -900,6 +933,9 @@ private void aggregateSingleValueDims(IntGrouper grouper)

private void aggregateMultiValueDims(IntGrouper grouper)
{
// No need to track strategy internal state footprint, because array-based grouping does not use strategies.
// It accesses dimension selectors directly and only works on truly dictionary-coded columns.

if (dim == null) {
throw new ISE("dim must exist");
}
@@ -29,7 +29,6 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.SettableSupplier;
@@ -63,6 +62,7 @@
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionary;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.RowAdapter;
@@ -101,9 +101,6 @@
*/
public class RowBasedGrouperHelper
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;

private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1;
private static final int UNKNOWN_THREAD_PRIORITY = -1;
private static final long UNKNOWN_TIMEOUT = -1L;
@@ -1144,14 +1141,11 @@ private static int compareDimsInRowsWithAggs(

static long estimateStringKeySize(@Nullable String key)
{
long length = key == null ? 0 : key.length();
return length * Character.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
return DictionaryBuilding.estimateEntryFootprint((key == null ? 0 : key.length()) * Character.BYTES);
}

private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGrouperHelper.RowBasedKey>
{
private static final int UNKNOWN_DICTIONARY_ID = -1;

private final boolean includeTimestamp;
private final boolean sortByDimsFirst;
private final List<DimensionSpec> dimensions;
@@ -1203,20 +1197,14 @@ private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGroupe
this.valueTypes = valueTypes;
this.limitSpec = limitSpec;
this.enableRuntimeDictionaryGeneration = dictionary == null;
this.dictionary = enableRuntimeDictionaryGeneration ? new ArrayList<>() : dictionary;
this.reverseDictionary = enableRuntimeDictionaryGeneration ?
new Object2IntOpenHashMap<>() :
new Object2IntOpenHashMap<>(dictionary.size());

this.arrayDictionary = new ArrayList<>();
this.reverseArrayDictionary = new Object2IntOpenHashMap<>();
this.dictionary = enableRuntimeDictionaryGeneration ? DictionaryBuilding.createDictionary() : dictionary;
this.reverseDictionary = DictionaryBuilding.createReverseDictionary();

this.listDictionary = new ArrayList<>();
this.reverseListDictionary = new Object2IntOpenHashMap<>();
this.arrayDictionary = DictionaryBuilding.createDictionary();
this.reverseArrayDictionary = DictionaryBuilding.createReverseDictionary();

this.reverseDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.reverseArrayDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.reverseListDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.listDictionary = DictionaryBuilding.createDictionary();
this.reverseListDictionary = DictionaryBuilding.createReverseDictionary();

this.maxDictionarySize = maxDictionarySize;
this.serdeHelpers = makeSerdeHelpers(limitSpec != null, enableRuntimeDictionaryGeneration);
@@ -1534,7 +1522,7 @@ public boolean putToKeyBuffer(RowBasedKey key, int idx)
{
final ComparableList comparableList = (ComparableList) key.getKey()[idx];
int id = reverseDictionary.getInt(comparableList);
if (id == UNKNOWN_DICTIONARY_ID) {
if (id == DimensionDictionary.ABSENT_VALUE_ID) {
id = listDictionary.size();
reverseListDictionary.put(comparableList, id);
listDictionary.add(comparableList);
@@ -1610,7 +1598,7 @@ public BufferComparator getBufferComparator()
private int addToArrayDictionary(final ComparableStringArray s)
{
int idx = reverseArrayDictionary.getInt(s);
if (idx == UNKNOWN_DICTIONARY_ID) {
if (idx == DimensionDictionary.ABSENT_VALUE_ID) {
idx = arrayDictionary.size();
reverseArrayDictionary.put(s, idx);
arrayDictionary.add(s);
@@ -1700,7 +1688,7 @@ public boolean putToKeyBuffer(RowBasedKey key, int idx)
private int addToDictionary(final String s)
{
int idx = reverseDictionary.getInt(s);
if (idx == UNKNOWN_DICTIONARY_ID) {
if (idx == DimensionDictionary.ABSENT_VALUE_ID) {
final long additionalEstimatedSize = estimateStringKeySize(s);
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
return -1;
@@ -1732,7 +1720,7 @@ public boolean putToKeyBuffer(RowBasedKey key, int idx)
final String stringKey = (String) key.getKey()[idx];

final int dictIndex = reverseDictionary.getInt(stringKey);
if (dictIndex == UNKNOWN_DICTIONARY_ID) {
if (dictIndex == DimensionDictionary.ABSENT_VALUE_ID) {
throw new ISE("Cannot find key[%s] from dictionary", stringKey);
}
keyBuffer.putInt(dictIndex);

0 comments on commit 875e069

Please sign in to comment.