-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3453] Netty-based BlockTransferService #2330
Changes from all commits
5bb88f5
9b3b397
dd783ff
b5b380e
1474824
b404da3
fbf882d
b32c3fe
d135fa3
1e0d277
6e84cb2
55266d1
f83611e
6ddaa5d
8295561
d7d0aac
29fe0cc
e92dad7
a79a259
088ed8a
323dfec
5814292
f23e682
ba8c441
ca88068
dfc2c34
3fbfd3f
69f5d0a
bc9ed22
a3a09f6
0140d6e
0dae310
ad09236
bdab2c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,14 @@ package org.apache.spark.network | |
import org.apache.spark.storage.StorageLevel | ||
|
||
|
||
private[spark] | ||
trait BlockDataManager { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you add this class at some point? Either way, would you mind adding a class comment? It's not clear how it differs from all the other block-related managers. |
||
|
||
/** | ||
* Interface to get local block data. | ||
* | ||
* @return Some(buffer) if the block exists locally, and None if it doesn't. | ||
* Interface to get local block data. Throws an exception if the block cannot be found or | ||
* cannot be read successfully. | ||
*/ | ||
def getBlockData(blockId: String): Option[ManagedBuffer] | ||
def getBlockData(blockId: String): ManagedBuffer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure how I feel about this taking a blockId as a string, especially if it's implemented by the BlockManager itself. What's the reasoning behind not taking a BlockId? Just too many users of the API that only have a String? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see the pr description on the todos as separate prs |
||
|
||
/** | ||
* Put the block locally, using the given storage level. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,17 @@ | |
|
||
package org.apache.spark.network | ||
|
||
import java.io.Closeable | ||
import java.nio.ByteBuffer | ||
|
||
import scala.concurrent.{Await, Future} | ||
import scala.concurrent.duration.Duration | ||
|
||
import org.apache.spark.storage.StorageLevel | ||
|
||
|
||
abstract class BlockTransferService { | ||
private[spark] | ||
abstract class BlockTransferService extends Closeable { | ||
|
||
/** | ||
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch | ||
|
@@ -34,7 +38,7 @@ abstract class BlockTransferService { | |
/** | ||
* Tear down the transfer service. | ||
*/ | ||
def stop(): Unit | ||
def close(): Unit | ||
|
||
/** | ||
* Port number the service is listening on, available only after [[init]] is invoked. | ||
|
@@ -50,9 +54,6 @@ abstract class BlockTransferService { | |
* Fetch a sequence of blocks from a remote node asynchronously, | ||
* available only after [[init]] is invoked. | ||
* | ||
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block, | ||
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block). | ||
* | ||
* Note that this API takes a sequence so the implementation can batch requests, and does not | ||
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as | ||
* the data of a block is fetched, rather than waiting for all blocks to be fetched. | ||
|
@@ -83,15 +84,18 @@ abstract class BlockTransferService { | |
val lock = new Object | ||
@volatile var result: Either[ManagedBuffer, Throwable] = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, is there an advantage of this over |
||
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener { | ||
override def onBlockFetchFailure(exception: Throwable): Unit = { | ||
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { | ||
lock.synchronized { | ||
result = Right(exception) | ||
lock.notify() | ||
} | ||
} | ||
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { | ||
lock.synchronized { | ||
result = Left(data) | ||
val ret = ByteBuffer.allocate(data.size.toInt) | ||
ret.put(data.nioByteBuffer()) | ||
ret.flip() | ||
result = Left(new NioManagedBuffer(ret)) | ||
lock.notify() | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,8 @@ import java.nio.channels.FileChannel.MapMode | |
import scala.util.Try | ||
|
||
import com.google.common.io.ByteStreams | ||
import io.netty.buffer.{ByteBufInputStream, ByteBuf} | ||
import io.netty.buffer.{Unpooled, ByteBufInputStream, ByteBuf} | ||
import io.netty.channel.DefaultFileRegion | ||
|
||
import org.apache.spark.util.{ByteBufferInputStream, Utils} | ||
|
||
|
@@ -34,11 +35,17 @@ import org.apache.spark.util.{ByteBufferInputStream, Utils} | |
* This interface provides an immutable view for data in the form of bytes. The implementation | ||
* should specify how the data is provided: | ||
* | ||
* - FileSegmentManagedBuffer: data backed by part of a file | ||
* - NioByteBufferManagedBuffer: data backed by a NIO ByteBuffer | ||
* - NettyByteBufManagedBuffer: data backed by a Netty ByteBuf | ||
* - [[FileSegmentManagedBuffer]]: data backed by part of a file | ||
* - [[NioManagedBuffer]]: data backed by a NIO ByteBuffer | ||
* - [[NettyManagedBuffer]]: data backed by a Netty ByteBuf | ||
* | ||
* The concrete buffer implementation might be managed outside the JVM garbage collector. | ||
* For example, in the case of [[NettyManagedBuffer]], the buffers are reference counted. | ||
* In that case, if the buffer is going to be passed around to a different thread, retain/release | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment here suggests that you should only have to retain if your buffer is a NettyManagedBuffer, which isn't quite true, as you could have an NioManagedBuffer whose ByteBuffer underlies a Netty ByteBuf, in which case it is never safe to pass around. I'm just a little worried about sanitary buffer usage, as misuse of this API from not copying could lead to nondeterministic data corruption. |
||
* should be called. | ||
*/ | ||
sealed abstract class ManagedBuffer { | ||
private[spark] | ||
abstract class ManagedBuffer { | ||
// Note that all the methods are defined with parenthesis because their implementations can | ||
// have side effects (io operations). | ||
|
||
|
@@ -57,12 +64,29 @@ sealed abstract class ManagedBuffer { | |
* it does not go over the limit. | ||
*/ | ||
def inputStream(): InputStream | ||
|
||
/** | ||
* Increment the reference count by one if applicable. | ||
*/ | ||
def retain(): this.type | ||
|
||
/** | ||
* If applicable, decrement the reference count by one and deallocates the buffer if the | ||
* reference count reaches zero. | ||
*/ | ||
def release(): this.type | ||
|
||
/** | ||
* Convert the buffer into an Netty object, used to write the data out. | ||
*/ | ||
private[network] def convertToNetty(): AnyRef | ||
} | ||
|
||
|
||
/** | ||
* A [[ManagedBuffer]] backed by a segment in a file | ||
*/ | ||
private[spark] | ||
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long) | ||
extends ManagedBuffer { | ||
|
||
|
@@ -113,34 +137,64 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt | |
} | ||
} | ||
|
||
private[network] override def convertToNetty(): AnyRef = { | ||
val fileChannel = new FileInputStream(file).getChannel | ||
new DefaultFileRegion(fileChannel, offset, length) | ||
} | ||
|
||
// Content of file segments are not in-memory, so no need to reference count. | ||
override def retain(): this.type = this | ||
override def release(): this.type = this | ||
|
||
override def toString: String = s"${getClass.getName}($file, $offset, $length)" | ||
} | ||
|
||
|
||
/** | ||
* A [[ManagedBuffer]] backed by [[java.nio.ByteBuffer]]. | ||
*/ | ||
final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer { | ||
private[spark] | ||
final class NioManagedBuffer(buf: ByteBuffer) extends ManagedBuffer { | ||
|
||
override def size: Long = buf.remaining() | ||
|
||
override def nioByteBuffer() = buf.duplicate() | ||
|
||
override def inputStream() = new ByteBufferInputStream(buf) | ||
|
||
private[network] override def convertToNetty(): AnyRef = Unpooled.wrappedBuffer(buf) | ||
|
||
// [[ByteBuffer]] is managed by the JVM garbage collector itself. | ||
override def retain(): this.type = this | ||
override def release(): this.type = this | ||
|
||
override def toString: String = s"${getClass.getName}($buf)" | ||
} | ||
|
||
|
||
/** | ||
* A [[ManagedBuffer]] backed by a Netty [[ByteBuf]]. | ||
*/ | ||
final class NettyByteBufManagedBuffer(buf: ByteBuf) extends ManagedBuffer { | ||
private[spark] | ||
final class NettyManagedBuffer(buf: ByteBuf) extends ManagedBuffer { | ||
|
||
override def size: Long = buf.readableBytes() | ||
|
||
override def nioByteBuffer() = buf.nioBuffer() | ||
|
||
override def inputStream() = new ByteBufInputStream(buf) | ||
|
||
// TODO(rxin): Promote this to top level ManagedBuffer interface and add documentation for it. | ||
def release(): Unit = buf.release() | ||
private[network] override def convertToNetty(): AnyRef = buf | ||
|
||
override def retain(): this.type = { | ||
buf.retain() | ||
this | ||
} | ||
|
||
override def release(): this.type = { | ||
buf.release() | ||
this | ||
} | ||
|
||
override def toString: String = s"${getClass.getName}($buf)" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better if we could go ahead and make the config stable, it's always painful for people to update later. Even if it's just like
Potentially less pain for when we address the TODO at a later date.