Skip to content

Commit

Permalink
Merge pull request #22 from gatorsmile/SPARK-24865-xiao
Browse files Browse the repository at this point in the history
resolve the conflicts
  • Loading branch information
rxin committed Jul 26, 2018
2 parents f2f1a97 + 7995272 commit fe52801
Show file tree
Hide file tree
Showing 273 changed files with 4,288 additions and 1,584 deletions.
8 changes: 4 additions & 4 deletions R/pkg/inst/tests/testthat/test_basic.R
Expand Up @@ -18,9 +18,9 @@
context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
tryCatch( checkJavaVersion(),
tryCatch(checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )
warning = function(e) { skip("warning on Java check") })

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)
Expand Down Expand Up @@ -54,9 +54,9 @@ test_that("create DataFrame from list or data.frame", {
})

test_that("spark.glm and predict", {
tryCatch( checkJavaVersion(),
tryCatch(checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )
warning = function(e) { skip("warning on Java check") })

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)
Expand Down
2 changes: 1 addition & 1 deletion bin/docker-image-tool.sh
Expand Up @@ -135,7 +135,7 @@ BASEDOCKERFILE=
PYDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
while getopts f:mr:t:n:b: option
while getopts f:p:mr:t:n:b: option
do
case "${option}"
in
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -1524,7 +1524,11 @@ class SparkContext(config: SparkConf) extends Logging {
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case null => new File(path).getCanonicalFile.toURI.toString
case "local" =>
logWarning("File with 'local' scheme is not supported to add to file server, since " +
"it is already available on every node.")
return
case _ => path
}

Expand Down
Expand Up @@ -432,7 +432,11 @@ package object config {
"external shuffle service, this feature can only be worked when external shuffle" +
"service is newer than Spark 2.2.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
// fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might
// as well use fetch-to-disk in that case. The message includes some metadata in addition
// to the block data itself (in particular UploadBlock has a lot of metadata), so we leave
// extra room.
.createWithDefault(Int.MaxValue - 512)

private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -130,6 +130,8 @@ private[spark] class BlockManager(

private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)
private val chunkSize =
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt

val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
Expand Down Expand Up @@ -660,6 +662,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
// we've read the data to disk.
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down Expand Up @@ -690,7 +697,7 @@ private[spark] class BlockManager(
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager)
} catch {
case NonFatal(e) =>
runningFailureCount += 1
Expand Down Expand Up @@ -724,7 +731,7 @@ private[spark] class BlockManager(
}

if (data != null) {
return Some(new ChunkedByteBuffer(data))
return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
}
logDebug(s"The value of block $blockId is null")
}
Expand Down
Expand Up @@ -17,17 +17,21 @@

package org.apache.spark.util.io

import java.io.InputStream
import java.io.{File, FileInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.WritableByteChannel
import java.nio.channels.{FileChannel, WritableByteChannel}
import java.nio.file.StandardOpenOption

import scala.collection.mutable.ListBuffer

import com.google.common.primitives.UnsignedBytes
import io.netty.buffer.{ByteBuf, Unpooled}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.storage.StorageUtils
import org.apache.spark.util.Utils

/**
* Read-only byte buffer which is physically stored as multiple chunks rather than a single
Expand Down Expand Up @@ -81,10 +85,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}

/**
* Wrap this buffer to view it as a Netty ByteBuf.
* Wrap this in a custom "FileRegion" which allows us to transfer over 2 GB.
*/
def toNetty: ByteBuf = {
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
def toNetty: ChunkedByteBufferFileRegion = {
new ChunkedByteBufferFileRegion(this, bufferWriteChunkSize)
}

/**
Expand Down Expand Up @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {

}

object ChunkedByteBuffer {
// TODO eliminate this method if we switch BlockManager to getting InputStreams
def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
}

def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = {
Utils.tryWithResource(FileChannel.open(file.toPath, StandardOpenOption.READ)) { channel =>
var remaining = length
var pos = offset
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxChunkSize)
val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize)
pos += chunkSize
remaining -= chunkSize
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
}

/**
* Reads data from a ChunkedByteBuffer.
*
Expand Down
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.io

import java.nio.channels.WritableByteChannel

import io.netty.channel.FileRegion
import io.netty.util.AbstractReferenceCounted

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.AbstractFileRegion


/**
* This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty
* message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion,
* even though the data is not backed by a file.
*/
private[io] class ChunkedByteBufferFileRegion(
private val chunkedByteBuffer: ChunkedByteBuffer,
private val ioChunkSize: Int) extends AbstractFileRegion {

private var _transferred: Long = 0
// this duplicates the original chunks, so we're free to modify the position, limit, etc.
private val chunks = chunkedByteBuffer.getChunks()
private val size = chunks.foldLeft(0L) { _ + _.remaining() }

protected def deallocate: Unit = {}

override def count(): Long = size

// this is the "start position" of the overall Data in the backing file, not our current position
override def position(): Long = 0

override def transferred(): Long = _transferred

private var currentChunkIdx = 0

def transferTo(target: WritableByteChannel, position: Long): Long = {
assert(position == _transferred)
if (position == size) return 0L
var keepGoing = true
var written = 0L
var currentChunk = chunks(currentChunkIdx)
while (keepGoing) {
while (currentChunk.hasRemaining && keepGoing) {
val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
val originalLimit = currentChunk.limit()
currentChunk.limit(currentChunk.position() + ioSize)
val thisWriteSize = target.write(currentChunk)
currentChunk.limit(originalLimit)
written += thisWriteSize
if (thisWriteSize < ioSize) {
// the channel did not accept our entire write. We do *not* keep trying -- netty wants
// us to just stop, and report how much we've written.
keepGoing = false
}
}
if (keepGoing) {
// advance to the next chunk (if there are any more)
currentChunkIdx += 1
if (currentChunkIdx == chunks.size) {
keepGoing = false
} else {
currentChunk = chunks(currentChunkIdx)
}
}
}
_transferred += written
written
}
}

0 comments on commit fe52801

Please sign in to comment.