Skip to content

Commit

Permalink
[hotfix] [flip6] Remove unnecessary timeout from SlotPool
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Feb 13, 2018
1 parent fd297f0 commit faa787c
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 26 deletions.
Expand Up @@ -135,22 +135,12 @@ public class JobManagerOptions {

public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
key("slot.request.timeout")
.defaultValue(10 * 60 * 1000L)
.defaultValue(5L * 60L * 1000L)
.withDescription("The timeout in milliseconds for requesting a slot from Slot Pool.");

public static final ConfigOption<Long> SLOT_REQUEST_RM_TIMEOUT =
key("slot.request.resourcemanager.timeout")
.defaultValue(10 * 1000L)
.withDescription("The timeout in milliseconds for sending a request to Resource Manager.");

public static final ConfigOption<Long> SLOT_ALLOCATION_RM_TIMEOUT =
key("slot.allocation.resourcemanager.timeout")
.defaultValue(5 * 60 * 1000L)
.withDescription("The timeout in milliseconds for allocation a slot from Resource Manager.");

public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
key("slot.idle.timeout")
.defaultValue(5 * 60 * 1000L)
.defaultValue(20L * 1000L)
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");

// ---------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -280,8 +280,7 @@ public JobMaster(
rpcService,
jobGraph.getJobID(),
SystemClock.getInstance(),
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)),
rpcTimeout,
rpcTimeout, Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)),
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)));

this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
Expand Down Expand Up @@ -137,17 +138,17 @@ protected SlotPool(RpcService rpcService, JobID jobId) {
rpcService,
jobId,
SystemClock.getInstance(),
Time.milliseconds(JobManagerOptions.SLOT_ALLOCATION_RM_TIMEOUT.defaultValue()),
Time.milliseconds(JobManagerOptions.SLOT_REQUEST_RM_TIMEOUT.defaultValue()),
AkkaUtils.getDefaultTimeout(),
Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()),
Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
}

public SlotPool(
RpcService rpcService,
JobID jobId,
Clock clock,
Time slotRequestTimeout,
Time rpcTimeout,
Time slotRequestTimeout,
Time idleSlotTimeout) {

super(rpcService);
Expand Down Expand Up @@ -714,7 +715,9 @@ private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Thr
private void checkTimeoutSlotAllocation(SlotRequestId slotRequestID) {
PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
if (request != null) {
failPendingRequest(request, new TimeoutException("Slot allocation request " + slotRequestID + " timed out"));
failPendingRequest(
request,
new TimeoutException("Slot allocation request " + slotRequestID + " timed out"));
}
}

Expand All @@ -733,7 +736,7 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequ
if (request != null) {
failPendingRequest(
request,
new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
new TimeoutException("No slot available and no connection to Resource Manager established."));
}
}

Expand Down
Expand Up @@ -1268,7 +1268,8 @@ class JobManager(
true
}

val allocationTimeout: Long = flinkConfiguration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)
val allocationTimeout: Long = flinkConfiguration.getLong(
JobManagerOptions.SLOT_REQUEST_TIMEOUT)

executionGraph = ExecutionGraphBuilder.buildGraph(
executionGraph,
Expand Down
Expand Up @@ -110,8 +110,8 @@ public void testSlotAllocationNoResourceManager() throws Exception {
rpcService,
jid,
SystemClock.getInstance(),
TestingUtils.infiniteTime(),
Time.milliseconds(10L), // this is the timeout for the request tested here
Time.milliseconds(10L), TestingUtils.infiniteTime(),
// this is the timeout for the request tested here
TestingUtils.infiniteTime()
);

Expand Down Expand Up @@ -360,8 +360,7 @@ public TestingSlotPool(
rpcService,
jobId,
clock,
slotRequestTimeout,
rpcTimeout,
rpcTimeout, slotRequestTimeout,
idleSlotTimeout);

releaseSlotConsumer = null;
Expand Down
Expand Up @@ -586,8 +586,7 @@ public void testCheckIdleSlot() throws Exception {
rpcService,
jobId,
clock,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(), TestingUtils.infiniteTime(),
timeout);

try {
Expand Down

0 comments on commit faa787c

Please sign in to comment.