Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Aug 30, 2014
2 parents f6a8a40 + d90434c commit cb53a2c
Show file tree
Hide file tree
Showing 53 changed files with 729 additions and 507 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.spark.storage
package org.apache.spark.shuffle

import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._

import org.apache.spark.Logging
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.executor.ShuffleWriteMetrics

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
Expand Down Expand Up @@ -61,20 +61,18 @@ private[spark] trait ShuffleWriterGroup {
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation

private[spark]
class ShuffleBlockManager(blockManager: BlockManager,
shuffleManager: ShuffleManager) extends Logging {
def conf = blockManager.conf
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
private val consolidateShuffleFiles =
conf.getBoolean("spark.shuffle.consolidateFiles", false)

// Are we using sort-based shuffle?
val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager]

private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

/**
Expand All @@ -93,22 +91,11 @@ class ShuffleBlockManager(blockManager: BlockManager,
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}

type ShuffleId = Int
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]

private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)

/**
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
* because it just writes a single file by itself.
*/
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
val shuffleState = shuffleStates(shuffleId)
shuffleState.completedMapTasks.add(mapId)
}

/**
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
* when the writers are closed successfully
Expand Down Expand Up @@ -181,17 +168,30 @@ class ShuffleBlockManager(blockManager: BlockManager,

/**
* Returns the physical file segment in which the given BlockId is located.
* This function should only be called if shuffle file consolidation is enabled, as it is
* an error condition if we don't find the expected block.
*/
def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
for (fileGroup <- shuffleState.allFileGroups) {
val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
if (segment.isDefined) { return segment.get }
private def getBlockLocation(id: ShuffleBlockId): FileSegment = {
if (consolidateShuffleFiles) {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
val iter = shuffleState.allFileGroups.iterator
while (iter.hasNext) {
val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId)
if (segment.isDefined) { return segment.get }
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
} else {
val file = blockManager.diskBlockManager.getFile(id)
new FileSegment(file, 0, file.length())
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
}

override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
val segment = getBlockLocation(blockId)
blockManager.diskStore.getBytes(segment)
}

override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = {
Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
}

/** Remove all the blocks / files and metadata related to a particular shuffle. */
Expand All @@ -207,14 +207,7 @@ class ShuffleBlockManager(blockManager: BlockManager,
private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
shuffleStates.get(shuffleId) match {
case Some(state) =>
if (sortBasedShuffle) {
// There's a single block ID for each map, plus an index file for it
for (mapId <- state.completedMapTasks) {
val blockId = new ShuffleBlockId(shuffleId, mapId, 0)
blockManager.diskBlockManager.getFile(blockId).delete()
blockManager.diskBlockManager.getFile(blockId.name + ".index").delete()
}
} else if (consolidateShuffleFiles) {
if (consolidateShuffleFiles) {
for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
file.delete()
}
Expand All @@ -240,13 +233,13 @@ class ShuffleBlockManager(blockManager: BlockManager,
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
}

def stop() {
override def stop() {
metadataCleaner.cancel()
}
}

private[spark]
object ShuffleBlockManager {
object FileShuffleBlockManager {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle

import java.io._
import java.nio.ByteBuffer

import org.apache.spark.SparkEnv
import org.apache.spark.storage._

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
* The offsets of the data blocks in the data file are stored in a separate index file.
*
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
*
*/
private[spark]
class IndexShuffleBlockManager extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
ShuffleBlockId(shuffleId, mapId, 0)
}

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
}

/**
* Remove data file and index file that contain the output data from one map.
* */
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists()) {
file.delete()
}

file = getIndexFile(shuffleId, mapId)
if (file.exists()) {
file.delete()
}
}

/**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockLocation to figure out where each block
* begins and ends.
* */
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)

for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} finally {
out.close()
}
}

/**
* Get the location of a block in a map output file. Uses the index file we create for it.
* */
private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

val in = new DataInputStream(new FileInputStream(indexFile))
try {
in.skip(blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset)
} finally {
in.close()
}
}

override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
val segment = getBlockLocation(blockId)
blockManager.diskStore.getBytes(segment)
}

override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = {
Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
}

override def stop() = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle

import java.nio.ByteBuffer

import org.apache.spark.storage.{FileSegment, ShuffleBlockId}

private[spark]
trait ShuffleBlockManager {
type ShuffleId = Int

/**
* Get shuffle block data managed by the local ShuffleBlockManager.
* @return Some(ByteBuffer) if block found, otherwise None.
*/
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]

def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer]

def stop(): Unit
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ private[spark] trait ShuffleManager {
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]

/** Remove a shuffle's metadata from the ShuffleManager. */
def unregisterShuffle(shuffleId: Int)
/**
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
*/
def unregisterShuffle(shuffleId: Int): Boolean

def shuffleBlockManager: ShuffleBlockManager

/** Shut down this ShuffleManager. */
def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import org.apache.spark.shuffle._
* mapper (possibly reusing these across waves of tasks).
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {

private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
Expand All @@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
new HashShuffleWriter(
shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Unit = {}
override def unregisterShuffle(shuffleId: Int): Boolean = {
shuffleBlockManager.removeShuffle(shuffleId)
}

override def shuffleBlockManager: FileShuffleBlockManager = {
fileShuffleBlockManager
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {}
override def stop(): Unit = {
shuffleBlockManager.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.shuffle.hash

import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv, TaskContext}
import org.apache.spark.storage.{BlockObjectWriter}
import org.apache.spark.serializer.Serializer
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter

private[spark] class HashShuffleWriter[K, V](
shuffleBlockManager: FileShuffleBlockManager,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
Expand All @@ -43,7 +44,6 @@ private[spark] class HashShuffleWriter[K, V](
metrics.shuffleWriteMetrics = Some(writeMetrics)

private val blockManager = SparkEnv.get.blockManager
private val shuffleBlockManager = blockManager.shuffleBlockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)
Expand Down
Loading

0 comments on commit cb53a2c

Please sign in to comment.