From 005b6c3559031e53c31d0a44499d01f89517d6b4 Mon Sep 17 00:00:00 2001 From: Timothy Farkas Date: Thu, 8 Feb 2018 15:25:59 -0800 Subject: [PATCH] DRILL-6143: Made FragmentsRunner's rpc timeout larger to reduce random failures and made it configurable as a SystemOption. --- .../src/main/java/org/apache/drill/exec/ExecConstants.java | 2 ++ .../drill/exec/server/options/SystemOptionManager.java | 3 ++- .../org/apache/drill/exec/work/foreman/FragmentsRunner.java | 5 ++--- exec/java-exec/src/main/resources/drill-module.conf | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index c949e515322..f3572d80930 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -59,6 +59,8 @@ private ExecConstants() { public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads"; public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads"; public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads"; + public static final String FRAG_RUNNER_RPC_TIMEOUT = "drill.exec.rpc.fragrunner.timeout"; + public static final PositiveLongValidator FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR = new PositiveLongValidator(FRAG_RUNNER_RPC_TIMEOUT, Long.MAX_VALUE); public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory"; public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem"; public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 4dba96dbcca..2b170e7913e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -212,7 +212,8 @@ public static CaseInsensitiveMap createDefaultOptionDefinition new OptionDefinition(ExecConstants.CPU_LOAD_AVERAGE), new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR), new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR), - new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)) + new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), + new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), }; final CaseInsensitiveMap map = CaseInsensitiveMap.newHashMap(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java index 2e5f2dd06e1..b6775765a80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java @@ -24,6 +24,7 @@ import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.proto.BitControl.InitializeFragments; @@ -61,8 +62,6 @@ public class FragmentsRunner { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); - private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; - private final WorkerBee bee; private final UserClientConnection initiatingClient; private final DrillbitContext drillbitContext; @@ -278,7 +277,7 @@ private void scheduleRemoteIntermediateFragments(final Multimap 0 && !endpointLatch.awaitUninterruptibly(timeout)) { long numberRemaining = endpointLatch.getCount(); throw UserException.connectionError() diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 5659c82ab07..39320c2080d 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -408,6 +408,7 @@ drill.exec.options: { debug.validate_iterators: false, debug.validate_vectors: false, drill.exec.functions.cast_empty_string_to_null: false, + drill.exec.rpc.fragrunner.timeout: 10000, # Setting to control if HashAgg should fallback to older behavior of consuming # unbounded memory. In case of 2 phase Agg when available memory is not enough # to start at least 2 partitions then HashAgg fallbacks to this case. It can be