Skip to content

Commit

Permalink
[FLINK-2240] [runtime] Pass flag to configure use of bloom filters th…
Browse files Browse the repository at this point in the history
…rough runtime configuration.

Also make sure that most tests run with enabled bloom filters, to increase test coverage.
  • Loading branch information
StephanEwen committed Aug 6, 2015
1 parent 61dcae3 commit 0b73b43
Show file tree
Hide file tree
Showing 31 changed files with 338 additions and 377 deletions.
15 changes: 10 additions & 5 deletions docs/setup/config.md
Expand Up @@ -244,11 +244,6 @@ free for objects created by user-defined functions. (DEFAULT: 0.7)
This parameter is only evaluated, if `taskmanager.memory.size` is not set.
- `jobclient.polling.interval`: The interval (in seconds) in which the client
polls the JobManager for the status of its job (DEFAULT: 2).
- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
fan-out for spilling hash tables. Limits the number of file handles per operator,
but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
when this fraction of its memory budget is full (DEFAULT: 0.8).
- `taskmanager.heartbeat-interval`: The interval in which the TaskManager sends
heartbeats to the JobManager.
- `jobmanager.max-heartbeat-delay-before-failure.msecs`: The maximum time that a
Expand Down Expand Up @@ -324,6 +319,16 @@ sample exceeds this value (possible because of misconfiguration of the parser),
the sampling aborts. This value can be overridden for a specific input with the
input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).

### Runtime Algorithms

- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
fan-out for spilling hash tables. Limits the number of file handles per operator,
but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
when this fraction of its memory budget is full (DEFAULT: 0.8).
- `taskmanager.runtime.hashjoin-bloom-filters`: If true, the hash join uses bloom filters to pre-filter records against spilled partitions. (DEFAULT: true)


## YARN


Expand Down
Expand Up @@ -172,6 +172,8 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";

// --------------------------- Runtime Algorithms -------------------------------

/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
Expand All @@ -184,18 +186,17 @@ public final class ConfigConstants {
* sorter will start spilling to disk.
*/
public static final String DEFAULT_SORT_SPILLING_THRESHOLD_KEY = "taskmanager.runtime.sort-spilling-threshold";

/**
* Parameter to switch hash join bloom filters for spilled partitions on and off.
*/
public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";

/**
* The config parameter defining the timeout for filesystem stream opening.
* A value of 0 indicates infinite waiting.
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";

/**
* While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
* to minimize the spilled probe records count.
*/
public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";

// ------------------------ YARN Configuration ------------------------

Expand Down Expand Up @@ -543,6 +544,13 @@ public final class ConfigConstants {
* The default task manager's maximum registration duration
*/
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";

// ------------------------ Runtime Algorithms ------------------------

/**
* Default setting for the switch for hash join bloom filters for spilled partitions.
*/
public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;

/**
* The default value for the maximum spilling fan in/out.
Expand All @@ -558,15 +566,9 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;

/**
* Enable bloom filter for hash join as it promote hash join performance most of the time.
*/
public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;

// ------------------------ YARN Configuration ------------------------


/**
* Minimum amount of Heap memory to subtract from the requested TaskManager size.
* We came up with these values experimentally.
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;

import java.util.Map;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -72,14 +73,11 @@ public interface Environment {
Configuration getTaskConfiguration();

/**
* @return The task manager configuration
*/
Configuration getTaskManagerConfiguration();

/**
* @return Hostname of the task manager
* Gets the task manager info, with configuration and hostname.
*
* @return The task manager info, with configuration and hostname.
*/
String getHostname();
TaskManagerRuntimeInfo getTaskManagerInfo();

/**
* Returns the job-wide configuration object that was attached to the JobGraph.
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
Expand Down Expand Up @@ -74,7 +75,10 @@ public void initialize() throws Exception {
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());

double availableMemory = config.getRelativeMemoryDriver();

boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);

ExecutionConfig executionConfig = taskContext.getExecutionConfig();
objectReuseEnabled = executionConfig.isObjectReuseEnabled();

Expand All @@ -89,7 +93,8 @@ public void initialize() throws Exception {
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);


} else if (buildSideIndex == 1 && probeSideIndex == 0) {
Expand All @@ -102,7 +107,8 @@ public void initialize() throws Exception {
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);

} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
Expand All @@ -118,7 +124,8 @@ public void initialize() throws Exception {
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);


} else if (buildSideIndex == 1 && probeSideIndex == 0) {
Expand All @@ -131,7 +138,8 @@ public void initialize() throws Exception {
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);

} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
Expand All @@ -148,12 +156,10 @@ public void prepare() throws Exception {

@Override
public void run() throws Exception {

final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();

while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));

}

@Override
Expand Down
Expand Up @@ -19,25 +19,27 @@
package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The join driver implements the logic of a join operator at runtime. It instantiates either
* hash or sort-merge based strategies to find joining pairs of records.
Expand Down Expand Up @@ -115,34 +117,73 @@ public void prepare() throws Exception{
if (LOG.isDebugEnabled()) {
LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}

boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);

// create and return joining iterator according to provided local strategy.
if (objectReuseEnabled) {
switch (ls) {
case MERGE:
this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());

this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
}
} else {
switch (ls) {
case MERGE:
this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());

break;
case HYBRIDHASH_BUILD_FIRST:
this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
Expand Down
Expand Up @@ -16,16 +16,14 @@
* limitations under the License.
*/


package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.functions.Function;


/**
* The interface to be implemented by all pact drivers that run alone (or as the primary driver) in a nephele task.
* The driver is the code that deals with everything that specific to a certain PACT. It implements the actual
* <i>map</i> or <i>reduce</i> specific code.
* The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
* A driver implements the actual code to perform a batch operation, like <i>map()</i>,
* <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
*
* @see PactTaskContext
*
Expand All @@ -37,7 +35,7 @@ public interface PactDriver<S extends Function, OT> {
void setup(PactTaskContext<S, OT> context);

/**
* Gets the number of inputs (= Nephele Gates and Readers) that the task has.
* Gets the number of inputs that the task has.
*
* @return The number of inputs.
*/
Expand Down
Expand Up @@ -26,15 +26,14 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;


/**
* A runtime task is the task that is executed by the flink engine inside a task vertex.
* It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
* deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
* in the case of iterations.
* The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
* the runtime components and configuration that they can use to fulfil their task.
*
* @param <S> The UDF type.
* @param <OT> The produced data type.
Expand All @@ -44,6 +43,8 @@
public interface PactTaskContext<S, OT> {

TaskConfig getTaskConfig();

TaskManagerRuntimeInfo getTaskManagerInfo();

ClassLoader getUserCodeClassLoader();

Expand Down

0 comments on commit 0b73b43

Please sign in to comment.