Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet #7265

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _

// Let the user specify short names for shuffle managers
val shuffleManagerAliases = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager",
"parquet" -> "org.apache.spark.shuffle.parquet.ParquetShuffleManager")

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

Expand Down Expand Up @@ -314,13 +321,9 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleMgrClass = shuffleManagerAliases
.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.parquet

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.shuffle._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.{ShuffleDependency, TaskContext}

class ErrorShuffleManager extends ShuffleManager {

private def throwError(error: String) = {
throw new NotImplementedError(
s"${ParquetShuffleConfig.fallbackShuffleManager} not defined: ${error}")
}

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
override def registerShuffle[K, V, C](shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
throwError(s"Unable to register shuffle for keyClass=${dependency.keyClassName} " +
s"valueClass=${dependency.valueClassName} combineClass=${dependency.combinerClassName}")
}

/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
*/
override def shuffleBlockResolver: ShuffleBlockResolver = new ShuffleBlockResolver {

override def stop(): Unit = {} // no-op

/**
* Retrieve the data for the specified block. If the data for that block is not available,
* throws an unspecified exception.
*/
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
throwError(s"Unable to get block data for ${blockId}")
}
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {} // no-op

/**
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
*/
override def unregisterShuffle(shuffleId: Int): Boolean = {
throwError("Unable to unregister shuffle")
}

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
throwError("Unable to get a writer")
}

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
throwError("Unable to get a reader")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.parquet

import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkEnv}

object ParquetShuffleConfig {
private val sparkManagerConfKey = "spark.shuffle.manager"
private val parquetManagerAlias = "parquet"
private val namespace = "spark.shuffle.parquet."
private val compressionKey = namespace + "compression"
private val blocksizeKey = namespace + "blocksize"
private val pagesizeKey = namespace + "pagesize"
private val enableDictionaryKey = namespace + "enabledictionary"
private[parquet] val fallbackShuffleManager = namespace + "fallback"

def isParquetShuffleEnabled: Boolean = {
isParquetShuffleEnabled(SparkEnv.get.conf)
}

def isParquetShuffleEnabled(conf: SparkConf): Boolean = {
val confValue = conf.get(sparkManagerConfKey, "")
confValue == parquetManagerAlias || confValue == classOf[ParquetShuffleManager].getName
}

def enableParquetShuffle(): Unit = {
enableParquetShuffle(SparkEnv.get.conf)
}

def enableParquetShuffle(conf: SparkConf): Unit = {
conf.set(ParquetShuffleConfig.sparkManagerConfKey, classOf[ParquetShuffleManager].getName)
}

def getCompression: CompressionCodecName = {
getCompression(SparkEnv.get.conf)
}

def getCompression(conf: SparkConf): CompressionCodecName = {
val confValue = conf.get(compressionKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME
} else {
CompressionCodecName.fromConf(confValue)
}
}

def getBlockSize: Int = {
getBlockSize(SparkEnv.get.conf)
}

def getBlockSize(conf: SparkConf): Int = {
val confValue = conf.get(blocksizeKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_BLOCK_SIZE
} else {
confValue.toInt
}
}

def getPageSize: Int = {
getPageSize(SparkEnv.get.conf)
}

def getPageSize(conf: SparkConf): Int = {
val confValue = conf.get(pagesizeKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_PAGE_SIZE
} else {
confValue.toInt
}
}

def isDictionaryEnabled: Boolean = {
isDictionaryEnabled(SparkEnv.get.conf)
}

def isDictionaryEnabled(conf: SparkConf): Boolean = {
val confValue = conf.get(enableDictionaryKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED
} else {
confValue.toBoolean
}
}

def setFallbackShuffleManager(managerName: String): Unit = {
setFallbackShuffleManager(SparkEnv.get.conf, managerName)
}

def setFallbackShuffleManager(conf: SparkConf, managerName: String): Unit = {
conf.set(fallbackShuffleManager, managerName)
}

def getFallbackShuffleManager: ShuffleManager = {
getFallbackShuffleManager(SparkEnv.get.conf)
}

def getFallbackShuffleManager(conf: SparkConf): ShuffleManager = {
val confValue = conf.get(fallbackShuffleManager, null)
if (confValue == null) {
new ErrorShuffleManager
} else {
val fullName = SparkEnv.shuffleManagerAliases.getOrElse(confValue, confValue)
val cls = Utils.classForName(fullName)
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[ShuffleManager]
}
}

}
Loading