Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/_includes/generated/algorithm_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>taskmanager.runtime.hashjoin-bloom-filters</h5></td>
<td>false</td>
<td>Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.</td>
</tr>
<tr>
<td><h5>taskmanager.runtime.max-fan</h5></td>
<td>128</td>
<td>The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.</td>
</tr>
<tr>
<td><h5>taskmanager.runtime.sort-spilling-threshold</h5></td>
<td>0.8</td>
<td>A sort operation starts spilling when this fraction of its memory budget is full.</td>
</tr>
</tbody>
</table>
6 changes: 1 addition & 5 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,7 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic

### Runtime Algorithms

- `taskmanager.runtime.hashjoin-bloom-filters`: Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles. (DEFAULT: false)

- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).

- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this fraction of its memory budget is full (DEFAULT: 0.8).
{% include generated/algorithm_configuration.html %}

### Resource Manager

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import static org.apache.flink.configuration.ConfigOptions.key;

/**
* Configuration parameters for join/sort algorithms.
*/
public class AlgorithmOptions {

public static final ConfigOption<Boolean> HASH_JOIN_BLOOM_FILTERS =
key("taskmanager.runtime.hashjoin-bloom-filters")
.defaultValue(false)
.withDescription("Flag to activate/deactivate bloom filters in the hybrid hash join implementation." +
" In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of" +
" memory), these bloom filters can greatly reduce the number of spilled records, at the cost some" +
" CPU cycles.");

public static final ConfigOption<Integer> SPILLING_MAX_FAN =
key("taskmanager.runtime.max-fan")
.defaultValue(128)
.withDescription("The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits" +
" the number of file handles per operator, but may cause intermediate merging/partitioning, if set too" +
" small.");

public static final ConfigOption<Float> SORT_SPILLING_THRESHOLD =
key("taskmanager.runtime.sort-spilling-threshold")
.defaultValue(0.8f)
.withDescription("A sort operation starts spilling when this fraction of its memory budget is full.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TempMode;
Expand Down Expand Up @@ -145,16 +146,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
* Creates a new job graph generator that uses the default values for its resource configuration.
*/
public JobGraphGenerator() {
this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
this.defaultMaxFan = AlgorithmOptions.SPILLING_MAX_FAN.defaultValue();
this.defaultSortSpillingThreshold = AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue();
this.useLargeRecordHandler = ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER;
}

public JobGraphGenerator(Configuration config) {
this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
this.defaultMaxFan = config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN);
this.defaultSortSpillingThreshold = config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD);
this.useLargeRecordHandler = config.getBoolean(
ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
Expand Down Expand Up @@ -80,9 +80,8 @@ public void initialize() throws Exception {
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());

double availableMemory = config.getRelativeMemoryDriver();
boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration()
.getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS);

ExecutionConfig executionConfig = taskContext.getExecutionConfig();
objectReuseEnabled = executionConfig.isObjectReuseEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
Expand Down Expand Up @@ -123,9 +123,8 @@ public void prepare() throws Exception{
LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}

boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration()
.getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS);

// create and return joining iterator according to provided local strategy.
if (objectReuseEnabled) {
Expand Down