Skip to content
Permalink
Browse files
Add setProcessingThreadNames context parameter. (#12514)
setting thread names takes a measurable amount of time in the case where segment scans are very quick. In high-QPS testing we found a slight performance boost from turning off processing thread renaming. This option makes that possible.
  • Loading branch information
gianm committed May 16, 2022
1 parent bb1a6de commit ff253fd8a32af4f74387bad14e8a4563af5c2bc0
Showing 4 changed files with 34 additions and 11 deletions.
@@ -62,6 +62,7 @@ Unless otherwise noted, the following parameters apply to all query types.
|secondaryPartitionPruning|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.|
|enableJoinLeftTableScanDirect|`false`|This flag applies to queries which have joins. For joins, where left child is a simple scan with a filter, by default, druid will run the scan as a query and the join the results to the right child on broker. Setting this flag to true overrides that behavior and druid will attempt to push the join to data servers instead. Please note that the flag could be applicable to queries even if there is no explicit join. since queries can internally translated into a join by the SQL planner.|
|debug| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|setProcessingThreadNames|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|maxNumericInFilters|`-1`|Max limit for the amount of numeric values that can be compared for a string type dimension when the entire SQL WHERE clause of a query translates only to an [OR](../querying/filters.md#or) of [Bound filter](../querying/filters.md#bound-filter). By default, Druid does not restrict the amount of of numeric Bound Filters on String columns, although this situation may block other queries from running. Set this property to a smaller value to prevent Druid from running queries that have prohibitively long segment processing times. The optimal limit requires some trial and error; we recommend starting with 100. Users who submit a query that exceeds the limit of `maxNumericInFilters` should instead rewrite their queries to use strings in the `WHERE` clause instead of numbers. For example, `WHERE someString IN (‘123’, ‘456’)`. This value cannot exceed the set system configuration `druid.sql.planner.maxNumericInFilters`. This value is ignored if `druid.sql.planner.maxNumericInFilters` is not set explicitly.|
|inSubQueryThreshold|`2147483647`| Threshold for minimum number of values in an IN clause to convert the query to a JOIN operation on an inlined table rather than a predicate. A threshold of 0 forces usage of an inline table in all cases; a threshold of [Integer.MAX_VALUE] forces usage of OR in all cases. |

@@ -19,6 +19,7 @@

package org.apache.druid.query.spec;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -45,6 +46,9 @@
private final QueryRunner<T> base;
private final SpecificSegmentSpec specificSpec;

@VisibleForTesting
static final String CTX_SET_THREAD_NAME = "setProcessingThreadNames";

public SpecificSegmentQueryRunner(
QueryRunner<T> base,
SpecificSegmentSpec specificSpec
@@ -64,18 +68,26 @@ public Sequence<T> run(final QueryPlus<T> input, final ResponseContext responseC
)
);

final Query<T> query = queryPlus.getQuery();
final boolean setName = input.getQuery().getContextBoolean(CTX_SET_THREAD_NAME, true);

final Thread currThread = Thread.currentThread();
final String currThreadName = currThread.getName();
final String newName = query.getType() + "_" + query.getDataSource() + "_" + query.getIntervals();
final Query<T> query = queryPlus.getQuery();

final Sequence<T> baseSequence = doNamed(
currThread,
currThreadName,
newName,
() -> base.run(queryPlus, responseContext)
);
final Thread currThread = setName ? Thread.currentThread() : null;
final String currThreadName = setName ? currThread.getName() : null;
final String newName = setName ? query.getType() + "_" + query.getDataSource() + "_" + query.getIntervals() : null;

final Sequence<T> baseSequence;

if (setName) {
baseSequence = doNamed(
currThread,
currThreadName,
newName,
() -> base.run(queryPlus, responseContext)
);
} else {
baseSequence = base.run(queryPlus, responseContext);
}

Sequence<T> segmentMissingCatchingSequence = new Sequence<T>()
{
@@ -149,7 +161,11 @@ public void close() throws IOException
@Override
public <RetType> RetType wrap(Supplier<RetType> sequenceProcessing)
{
return doNamed(currThread, currThreadName, newName, sequenceProcessing);
if (setName) {
return doNamed(currThread, currThreadName, newName, sequenceProcessing);
} else {
return sequenceProcessing.get();
}
}
}
);
@@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -181,6 +182,8 @@ public void run()
new CountAggregatorFactory("rows")
)
)
// Do one test with CTX_SET_THREAD_NAME = false.
.context(ImmutableMap.of(SpecificSegmentQueryRunner.CTX_SET_THREAD_NAME, false))
.build();
Sequence results = queryRunner.run(QueryPlus.wrap(query), responseContext);
List<Result<TimeseriesResultValue>> res = results.toList();
@@ -285,6 +285,7 @@ gzipped
hadoop
hasher
hashtable
high-QPS
historicals
hostname
hostnames
@@ -377,6 +378,7 @@ procs
programmatically
proto
proxied
QPS
quantile
quantiles
queryable
@@ -403,6 +405,7 @@ searchable
secondaryPartitionPruning
seekable-stream
servlet
setProcessingThreadNames
simple-client-sslcontext
sharded
sharding

0 comments on commit ff253fd

Please sign in to comment.