From 04da09c085764349ca618eb543627570da3775fe Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Jul 2015 15:31:59 -0700 Subject: [PATCH] WIP attempt at implementing socket read timeouts (SPARK-9328) --- .../NettyBlockTransferTimeoutSuite.scala | 58 +++++++++++++++++++ .../spark/network/TransportContext.java | 2 + .../spark/network/util/TransportConf.java | 5 ++ 3 files changed, 65 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferTimeoutSuite.scala diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferTimeoutSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferTimeoutSuite.scala new file mode 100644 index 0000000000000..a2d3f78074604 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferTimeoutSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty + +import scala.concurrent.duration._ + +import io.netty.handler.timeout.ReadTimeoutException +import org.mockito.Mockito._ +import org.mockito.Matchers.any +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.FunSuite + +import org.apache.spark.{SparkConf, SecurityManager} +import org.apache.spark.network.BlockDataManager +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.storage.ShuffleBlockId +import org.scalatest.concurrent.Timeouts + +class NettyBlockTransferTimeoutSuite extends FunSuite with Timeouts { + test("read timeout") { + val conf = new SparkConf() + .set("spark.app.id", "appId") + .set("spark.shuffle.io.readTimeout", "5") + val securityManager = new SecurityManager(conf) + val bts = new NettyBlockTransferService(conf, securityManager, numCores = 1) + val blockDataManager = mock(classOf[BlockDataManager], RETURNS_SMART_NULLS) + when(blockDataManager.getBlockData(any())).thenAnswer(new Answer[ManagedBuffer] { + override def answer(invocation: InvocationOnMock): ManagedBuffer = { + Thread.sleep(30 * 10000) + null + } + }) + bts.init(blockDataManager) + + + failAfter(30.seconds) { + intercept[ReadTimeoutException] { + bts.fetchBlockSync(bts.hostName, bts.port, "execId", ShuffleBlockId(0, 0, 0).toString) + } + } + } +} diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index 5bc6e5a2418a9..58f7c7b898257 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +105,7 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) { try { TransportChannelHandler channelHandler = createChannelHandler(channel); channel.pipeline() + .addLast("readTimeoutHandler", new ReadTimeoutHandler(conf.readTimeoutSeconds())) .addLast("encoder", encoder) .addLast("frameDecoder", NettyUtils.createFrameDecoder()) .addLast("decoder", decoder) diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 7c9adf52af0f0..b723349d2f787 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -40,6 +40,11 @@ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } + /** Read timeout in milliseconds. Default 120 secs. */ + public int readTimeoutSeconds() { + return conf.getInt("spark.shuffle.io.readTimeout", 120); + } + /** Number of concurrent connections between two nodes for fetching data. */ public int numConnectionsPerPeer() { return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);