New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Test only][DEMO][SPARK-6235]Address various 2G limits #14647

Closed
wants to merge 2 commits into
base: master
from

Conversation

Projects
None yet
3 participants
@witgo
Contributor

witgo commented Aug 15, 2016

What changes were proposed in this pull request?

  1. Replace DiskStore method def getBytes (blockId: BlockId): ChunkedByteBuffer to def getBlockData(blockId: BlockId): ManagedBuffer .
  2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer.
  3. Add class ChunkFetchInputStream, used for flow control
  4. Move class ChunkedByteBuffer, ChunkedByteBufferInputStream and ChunkedByteBufferOutputStream to common/network-common/src/main/java/org/apache/spark/network/buffer/
  5. Replace ByteBuffer with ChunkedByteBuffer :
  • FileSegmentManagedBuffer
    add constructor public FileSegmentManagedBuffer(long memoryMapBytes, boolean lazyFileDescriptor, File file, long offset, long length)
  • NettyManagedBuffer
    Support Zero-copy in nioByteBuffer method
  • NioManagedBuffer
    add constructor public NioManagedBuffer(ChunkedByteBuffer buf)
  • RpcResponseCallback
    void onSuccess(ByteBuffer response) => void onSuccess(ChunkedByteBuffer response)
  • TransportClient
    public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) => public long sendRpc(ChunkedByteBuffer message, final RpcResponseCallback callback)
    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) => ChunkedByteBuffer sendRpcSync(ChunkedByteBuffer message, long timeoutMs)
    public void send(ByteBuffer message) => public void send(ChunkedByteBuffer message)
  • SaslRpcHandler
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
    public void receive(TransportClient client, ByteBuffer message) => public void receive(TransportClient client, ChunkedByteBuffer message)
  • NoOpRpcHandler
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
  • RpcHandler
    public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public abstract void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
    public void receive(TransportClient client, ByteBuffer message) => public void receive(TransportClient client, ChunkedByteBuffer message)
    public void onSuccess(ByteBuffer response) => public void onSuccess(ChunkedByteBuffer response)
  • org.apache.spark.network.shuffle.protocol.Decoder
    BlockTransferMessage fromByteBuffer(ByteBuffer msg) => public static BlockTransferMessage fromByteBuffer(ChunkedByteBuffer msg)
  • TorrentBroadcast
    def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer], serializer: Serializer, compressionCodec: Option[CompressionCodec]) =>
    def unBlockifyObject[T: ClassTag](blocks: Array[ChunkedByteBuffer], serializer: Serializer, compressionCodec: Option[CompressionCodec])
  • Executor
    def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit =>
    def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ChunkedByteBuffer): Unit
  • ExecutorBackend
    def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit => def statusUpdate(taskId: Long, state: TaskState, data: ChunkedByteBuffer): Unit
  • OneWayOutboxMessage
    case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage => case class OneWayOutboxMessage(content: ChunkedByteBuffer) extends OutboxMessage
  • org.apache.spark.scheduler.Task
    serializeWithDependencies(task: Task[_], currentFiles: mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: SerializerInstance): ByteBuffer =>
    serializeWithDependencies(task: Task[_], currentFiles: mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: SerializerInstance): ChunkedByteBuffer
    deserializeWithDependencies(serializedTask: ByteBuffer): (HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer) =>
    deserializeWithDependencies(serializedTask: ChunkedByteBuffer): (HashMap[String, Long], HashMap[String, Long], Properties, ChunkedByteBuffer)
  • TaskDescription
    private[spark] class TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int, val serializedTask: ChunkedByteBuffer) => private[spark] class TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int, _serializedTask: ByteBuffer)
  • DirectTaskResult
    private[spark] class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]=>
    private[spark] class DirectTaskResult[T](var valueBytes: ChunkedByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]
  • TaskResultGetter
    def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit =>def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ChunkedByteBuffer): Unit
    def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ChunkedByteBuffer)
  • TaskSchedulerImpl
    def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) => def statusUpdate(tid: Long, state: TaskState, serializedData: ChunkedByteBuffer)
  • CoarseGrainedClusterMessages.LaunchTask
    case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage => case class LaunchTask(data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage
  • CoarseGrainedClusterMessages.StatusUpdate
    case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) extends CoarseGrainedClusterMessage => case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage
  • org.apache.spark.scheduler.local.LocalSchedulerBackend
    private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) => private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ChunkedByteBuffer)
  • SerializerInstance
    def serialize[T: ClassTag](t: T): ByteBuffer' =>def serialize[T: ClassTag](t: T): ChunkedByteBufferdef deserialize[T: ClassTag](bytes: ByteBuffer): T=> def deserialize[T: ClassTag](bytes: ChunkedByteBuffer): T def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T=>def deserialize[T: ClassTag](bytes: ChunkedByteBuffer, loader: ClassLoader): T
@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Aug 15, 2016

Test build #63783 has finished for PR 14647 at commit dc9ffd6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class ChunkedByteBuffer implements Externalizable
    • public class ChunkedByteBufferInputStream extends InputStream
    • public class ChunkedByteBufferOutputStream extends OutputStream
    • public class ChunkFetchInputStream extends InputStream
    • public class ChunkFetchStreamCallback implements StreamCallback
    • case class LaunchTask(data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage

SparkQA commented Aug 15, 2016

Test build #63783 has finished for PR 14647 at commit dc9ffd6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class ChunkedByteBuffer implements Externalizable
    • public class ChunkedByteBufferInputStream extends InputStream
    • public class ChunkedByteBufferOutputStream extends OutputStream
    • public class ChunkFetchInputStream extends InputStream
    • public class ChunkFetchStreamCallback implements StreamCallback
    • case class LaunchTask(data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage

@witgo witgo changed the title from [WIP][Test only][SPARK-6235]Address various 2G limits to [WIP][Test only][DEMO][SPARK-6235]Address various 2G limits Aug 15, 2016

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Aug 15, 2016

Test build #63784 has finished for PR 14647 at commit a211c1a.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Aug 15, 2016

Test build #63784 has finished for PR 14647 at commit a211c1a.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@hvanhovell

This comment has been minimized.

Show comment
Hide comment
@hvanhovell

hvanhovell Aug 15, 2016

Contributor

@witgo Could you provide a more high level description of this PR? I am curious to see which problem you are trying to solve? Also see @srowen's comment on the JIRA.

The sheer size in lines of code and files modified make this a challenge to review. Is it an option to split this PR in number of smaller PRs? The other thing I am missing are tests.

Contributor

hvanhovell commented Aug 15, 2016

@witgo Could you provide a more high level description of this PR? I am curious to see which problem you are trying to solve? Also see @srowen's comment on the JIRA.

The sheer size in lines of code and files modified make this a challenge to review. Is it an option to split this PR in number of smaller PRs? The other thing I am missing are tests.

@witgo

This comment has been minimized.

Show comment
Hide comment
@witgo

witgo Aug 16, 2016

Contributor

@hvanhovell
I will submit some small PRs and provide a more high level description of them.

Contributor

witgo commented Aug 16, 2016

@hvanhovell
I will submit some small PRs and provide a more high level description of them.

@witgo

This comment has been minimized.

Show comment
Hide comment
@witgo
Contributor

witgo commented Aug 16, 2016

@witgo witgo closed this Aug 23, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment