Skip to content

[WIP][SPARK-17082][CORE]Replace ByteBuffer with ChunkedByteBuffer#14662

Closed
witgo wants to merge 1 commit intoapache:masterfrom
witgo:SPARK-17082_ByteBuffer_2_ChunkedByteBuffer
Closed

[WIP][SPARK-17082][CORE]Replace ByteBuffer with ChunkedByteBuffer#14662
witgo wants to merge 1 commit intoapache:masterfrom
witgo:SPARK-17082_ByteBuffer_2_ChunkedByteBuffer

Conversation

@witgo
Copy link
Contributor

@witgo witgo commented Aug 16, 2016

What changes were proposed in this pull request?

The size of ByteBuffers can not be greater than 2G, it should be replaced by ChunkedByteBuffer

  • Move class ChunkedByteBuffer, ChunkedByteBufferInputStream and ChunkedByteBufferOutputStream to common/network-common/src/main/java/org/apache/spark/network/buffer/
  • ManagedBuffer
    public abstract ByteBuffer nioByteBuffer() throws IOException; => public abstract ChunkedByteBuffer nioByteBuffer() throws IOException;
  • FileSegmentManagedBuffer
    add constructor public FileSegmentManagedBuffer(long memoryMapBytes, boolean lazyFileDescriptor, File file, long offset, long length)
  • NettyManagedBuffer
    Support Zero-copy in nioByteBuffer method
  • NioManagedBuffer
    add constructor public NioManagedBuffer(ChunkedByteBuffer buf)
  • RpcResponseCallback
    void onSuccess(ByteBuffer response) => void onSuccess(ChunkedByteBuffer response)
  • TransportClient
    public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) => public long sendRpc(ChunkedByteBuffer message, final RpcResponseCallback callback)
    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) => ChunkedByteBuffer sendRpcSync(ChunkedByteBuffer message, long timeoutMs)
    public void send(ByteBuffer message) => public void send(ChunkedByteBuffer message)
  • SaslRpcHandler
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
    public void receive(TransportClient client, ByteBuffer message) => public void receive(TransportClient client, ChunkedByteBuffer message)
  • NoOpRpcHandler
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
  • RpcHandler
    public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) => public abstract void receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback callback)
    public void receive(TransportClient client, ByteBuffer message) => public void receive(TransportClient client, ChunkedByteBuffer message)
    public void onSuccess(ByteBuffer response) => public void onSuccess(ChunkedByteBuffer response)
  • org.apache.spark.network.shuffle.protocol.Decoder
    BlockTransferMessage fromByteBuffer(ByteBuffer msg) => public static BlockTransferMessage fromByteBuffer(ChunkedByteBuffer msg)
  • TorrentBroadcast
    def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer], serializer: Serializer, compressionCodec: Option[CompressionCodec]) =>
    def unBlockifyObject[T: ClassTag](blocks: Array[ChunkedByteBuffer], serializer: Serializer, compressionCodec: Option[CompressionCodec])
  • Executor
    def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit =>
    def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ChunkedByteBuffer): Unit
  • ExecutorBackend
    def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit => def statusUpdate(taskId: Long, state: TaskState, data: ChunkedByteBuffer): Unit
  • OneWayOutboxMessage
    case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage => case class OneWayOutboxMessage(content: ChunkedByteBuffer) extends OutboxMessage
  • org.apache.spark.scheduler.Task
    serializeWithDependencies(task: Task[_], currentFiles: mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: SerializerInstance): ByteBuffer =>
    serializeWithDependencies(task: Task[_], currentFiles: mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: SerializerInstance): ChunkedByteBuffer
    deserializeWithDependencies(serializedTask: ByteBuffer): (HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer) =>
    deserializeWithDependencies(serializedTask: ChunkedByteBuffer): (HashMap[String, Long], HashMap[String, Long], Properties, ChunkedByteBuffer)
  • TaskDescription
    private[spark] class TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int, val serializedTask: ChunkedByteBuffer) => private[spark] class TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int, _serializedTask: ByteBuffer)
  • DirectTaskResult
    private[spark] class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]=>
    private[spark] class DirectTaskResult[T](var valueBytes: ChunkedByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]
  • TaskResultGetter
    def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit =>def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ChunkedByteBuffer): Unit
    def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ChunkedByteBuffer)
  • TaskSchedulerImpl
    def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) => def statusUpdate(tid: Long, state: TaskState, serializedData: ChunkedByteBuffer)
  • CoarseGrainedClusterMessages.LaunchTask
    case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage => case class LaunchTask(data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage
  • CoarseGrainedClusterMessages.StatusUpdate
    case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) extends CoarseGrainedClusterMessage => case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage
  • org.apache.spark.scheduler.local.LocalSchedulerBackend
    private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) => private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ChunkedByteBuffer)
  • SerializerInstance
    def serialize[T: ClassTag](t: T): ByteBuffer => def serialize[T: ClassTag](t: T): ChunkedByteBuffer
    def deserialize[T: ClassTag](bytes: ByteBuffer): T => def deserialize[T: ClassTag](bytes: ChunkedByteBuffer): T
    def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T => def deserialize[T: ClassTag](bytes: ChunkedByteBuffer, loader: ClassLoader): T
    `

How was this patch tested?

TODO: ....

@witgo witgo force-pushed the SPARK-17082_ByteBuffer_2_ChunkedByteBuffer branch from b6c1e3a to b229441 Compare August 16, 2016 08:44
@SparkQA
Copy link

SparkQA commented Aug 16, 2016

Test build #63835 has finished for PR 14662 at commit b6c1e3a.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2016

Test build #63837 has finished for PR 14662 at commit b229441.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants