Skip to content

Commit

Permalink
Add more error handling and tests for error cases
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Jul 30, 2014
1 parent bbf359d commit cc52caf
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ private[spark] class SortShuffleManager extends ShuffleManager {

/** Get the location of a block in a map output file. Uses the index file we create for it. */
def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = {
// The block is actually going to be a range of a single map output file for this map,
// so
// The block is actually going to be a range of a single map output file for this map, so
// so figure out the ID of the consolidated file, then the offset within that from our index
val realId = ShuffleBlockId(blockId.shuffleId, blockId.mapId, 0)
val indexFile = diskManager.getFile(realId.name + ".index")
val in = new DataInputStream(new FileInputStream(indexFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.shuffle.sort

import java.io.{BufferedOutputStream, File, FileOutputStream, DataOutputStream}

import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.storage.ShuffleBlockId
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.executor.ShuffleWriteMetrics
import java.io.{BufferedOutputStream, FileOutputStream, DataOutputStream}

private[spark] class SortShuffleWriter[K, V, C](
handle: BaseShuffleHandle[K, V, C],
Expand All @@ -35,17 +35,15 @@ private[spark] class SortShuffleWriter[K, V, C](

private val dep = handle.dependency
private val numPartitions = dep.partitioner.numPartitions
private val metrics = context.taskMetrics

private val blockManager = SparkEnv.get.blockManager
private val shuffleBlockManager = blockManager.shuffleBlockManager
private val diskBlockManager = blockManager.diskBlockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))

private val conf = SparkEnv.get.conf
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null

private var stopping = false
private var mapStatus: MapStatus = null
Expand All @@ -72,7 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
// serve different ranges of this file using an index file that we create at the end.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
val shuffleFile = blockManager.diskBlockManager.getFile(blockId)
outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
Expand All @@ -84,7 +82,7 @@ private[spark] class SortShuffleWriter[K, V, C](

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, shuffleFile, ser, fileBufferSize)
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize)
for (elem <- elements) {
writer.write(elem)
}
Expand Down Expand Up @@ -125,8 +123,6 @@ private[spark] class SortShuffleWriter[K, V, C](

mapStatus = new MapStatus(blockManager.blockManagerId,
lengths.map(MapOutputTracker.compressSize))

// TODO: keep track of our file in a way that can be cleaned up later
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -139,11 +135,17 @@ private[spark] class SortShuffleWriter[K, V, C](
if (success) {
return Option(mapStatus)
} else {
// TODO: clean up our file
// The map task failed, so delete our output file if we created one
if (outputFile != null) {
outputFile.delete()
}
return None
}
} finally {
// TODO: sorter.stop()
// Clean up our sorter, which may have its own intermediate files
if (sorter != null) {
sorter.stop()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
getBlockLocation(blockId).file.exists()
}

/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatten.filter(_ != null).flatMap { dir =>
val files = dir.list()
val files = dir.listFiles()
if (files != null) files else Seq.empty
}.map(BlockId.apply)
}
}

/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
getAllFiles().map(f => BlockId(f.getName))
}

/** Produces a unique block id and File suitable for intermediate results. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private[spark] class ExternalSorter[K, V, C](
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
// TODO: Would prefer to have an ArrayBuffer[Any] that we sort pairs of adjacent elements in.
var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
var buffer = new SizeTrackingBuffer[((Int, K), C)]

Expand Down Expand Up @@ -187,7 +186,7 @@ private[spark] class ExternalSorter[K, V, C](
val batchSizes = new ArrayBuffer[Long]

// How many elements we have in each partition
// TODO: this should become a sparser data structure
// TODO: this could become a sparser data structure
val elementsPerPartition = new Array[Long](numPartitions)

// Flush the disk writer's contents to disk, and update relevant variables
Expand Down Expand Up @@ -220,9 +219,11 @@ private[spark] class ExternalSorter[K, V, C](
if (objectsWritten > 0) {
flush()
}
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
writer.close()
} catch {
case e: Exception =>
writer.close()
file.delete()
}

if (usingMap) {
Expand Down Expand Up @@ -267,6 +268,12 @@ private[spark] class ExternalSorter[K, V, C](
val fileStream = new FileInputStream(spill.file)
val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)

// Track which partition and which batch stream we're in
var partitionId = 0
var indexInPartition = -1L // Just to make sure we start at index 0
var batchStreamsRead = 0
var indexInBatch = 0

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var batchStream = nextBatchStream()
Expand All @@ -275,21 +282,10 @@ private[spark] class ExternalSorter[K, V, C](
var nextItem: (K, C) = null
var finished = false

// Track which partition and which batch stream we're in
var partitionId = 0
var indexInPartition = -1L // Just to make sure we start at index 0
var batchStreamsRead = 0
var indexInBatch = -1

/** Construct a stream that only reads from the next batch */
def nextBatchStream(): InputStream = {
if (batchStreamsRead < spill.serializerBatchSizes.length) {
batchStreamsRead += 1
ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
} else {
// No more batches left
bufferedStream
}
batchStreamsRead += 1
ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
}

/**
Expand All @@ -304,6 +300,8 @@ private[spark] class ExternalSorter[K, V, C](
if (finished) {
return null
}
val k = deserStream.readObject().asInstanceOf[K]
val c = deserStream.readObject().asInstanceOf[C]
// Start reading the next batch if we're done with this one
indexInBatch += 1
if (indexInBatch == serializerBatchSize) {
Expand All @@ -318,10 +316,9 @@ private[spark] class ExternalSorter[K, V, C](
partitionId += 1
indexInPartition = 0
}
val k = deserStream.readObject().asInstanceOf[K]
val c = deserStream.readObject().asInstanceOf[C]
if (partitionId == numPartitions - 1 &&
indexInPartition == spill.elementsPerPartition(partitionId) - 1) {
// This is the last element, remember that we're done
finished = true
deserStream.close()
}
Expand Down Expand Up @@ -382,7 +379,10 @@ private[spark] class ExternalSorter[K, V, C](
*/
def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)

def stop(): Unit = ???
def stop(): Unit = {
spills.foreach(s => s.file.delete())
spills.clear()
}

def memoryBytesSpilled: Long = _memoryBytesSpilled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.util.collection

import org.scalatest.FunSuite

import org.apache.spark.{SparkContext, SparkConf, LocalSparkContext}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
import scala.Some

class ExternalSorterSuite extends FunSuite with LocalSparkContext {

test("spilling in local cluster") {
ignore("spilling in local cluster") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
Expand Down Expand Up @@ -77,4 +76,83 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
}
}

test("cleanup of intermediate files in sorter") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
sorter.write((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)

val sorter2 = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
sorter2.write((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
sorter2.stop()
assert(diskBlockManager.getAllBlocks().length === 0)
}

test("cleanup of intermediate files in sorter if there are errors") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
intercept[SparkException] {
sorter.write((0 until 100000).iterator.map(i => {
if (i == 99990) {
throw new SparkException("Intentional failure")
}
(i, i)
}))
}
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)
}

test("cleanup of intermediate files in shuffle") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val data = sc.parallelize(0 until 100000, 2).map(i => (i, i))
assert(data.reduceByKey(_ + _).count() === 100000)

// After the shuffle, there should be only 4 files on disk: our two map output files and
// their index files. All other intermediate files should've been deleted.
assert(diskBlockManager.getAllFiles().length === 4)
}

test("cleanup of intermediate files in shuffle with errors") {
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val data = sc.parallelize(0 until 100000, 2).map(i => {
if (i == 99990) {
throw new Exception("Intentional failure")
}
(i, i)
})
intercept[SparkException] {
data.reduceByKey(_ + _).count()
}

// After the shuffle, there should be only 2 files on disk: the output of task 1 and its index.
// All other files (map 2's output and intermediate merge files) should've been deleted.
assert(diskBlockManager.getAllFiles().length === 2)
}
}

0 comments on commit cc52caf

Please sign in to comment.