Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into decisiontree-pyth…
Browse files Browse the repository at this point in the history
…on-new
  • Loading branch information
jkbradley committed Aug 1, 2014
2 parents 93953f1 + 78f2af5 commit 6873fa9
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ private[spark] class HashShuffleWriter[K, V](
}

/** Close this writer, passing along whether the map completed */
override def stop(success: Boolean): Option[MapStatus] = {
override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
var success = initiallySuccess
try {
if (stopping) {
return None
}
stopping = true
if (success) {
try {
return Some(commitWritesAndBuildStatus())
Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
success = false
revertWrites()
throw e
}
} else {
revertWrites()
return None
None
}
} finally {
// Release the writers back to the shuffle block manager.
Expand All @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
Expand All @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
writer.revertPartialWritesAndClose()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
writer.commit()
writer.close()
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean

/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
* Flush the partial writes and commit them as a single atomic block.
*/
def commit(): Long
def commitAndClose(): Unit

/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions.
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
*/
def revertPartialWrites()
def revertPartialWritesAndClose()

/**
* Writes an object.
Expand All @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* Returns the file segment of committed data that this Writer has written.
* This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment

Expand Down Expand Up @@ -108,15 +109,14 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
private var lastValidPosition = initialPosition
private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
Expand Down Expand Up @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
override def commitAndClose(): Unit = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
close()
}
finalPosition = file.length()
}

override def revertPartialWrites() {
if (initialized) {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
if (initialized) {
objOut.flush()
bs.flush()
close()
}

val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
}
}

Expand All @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(

// Only valid if called after commit()
override def bytesWritten: Long = {
lastValidPosition - initialPosition
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
finalPosition - initialPosition
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
fileGroup.recordMapOutput(mapId, offsets)
val lengths = writers.map(_.fileSegment().length)
fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
Expand Down Expand Up @@ -247,47 +248,48 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
private var numBlocks: Int = 0

/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()

/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def numBlocks = mapIdToIndex.size
private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def apply(bucketId: Int) = files(bucketId)

def recordMapOutput(mapId: Int, offsets: Array[Long]) {
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
blockLengthsByReducer(i) += lengths(i)
}
}

/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
val length =
if (index + 1 < numBlocks) {
blockOffsets(index + 1) - offset
} else {
file.length() - offset
}
assert(length >= 0)
val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
Expand Down
35 changes: 22 additions & 13 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,23 @@ private[spark] object Utils extends Logging {
out: OutputStream,
closeStreams: Boolean = false)
{
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
try {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
}
}
} finally {
if (closeStreams) {
try {
in.close()
} finally {
out.close()
}
}
}
if (closeStreams) {
in.close()
out.close()
}
}

Expand Down Expand Up @@ -868,9 +874,12 @@ private[spark] object Utils extends Logging {
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)

stream.skip(effectiveStart)
stream.read(buff)
stream.close()
try {
stream.skip(effectiveStart)
stream.read(buff)
} finally {
stream.close()
}
Source.fromBytes(buff).mkString
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](

// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)

// Flush the disk writer's contents to disk, and update relevant variables
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is closed at the end of this process, and cannot be reused.
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand All @@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
}
Expand Down
Loading

0 comments on commit 6873fa9

Please sign in to comment.