From 0a4c0289176b09c0934d73411c42dca983daf264 Mon Sep 17 00:00:00 2001 From: Matt Massie Date: Tue, 7 Jul 2015 12:52:59 -0700 Subject: [PATCH] [SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet This commit adds a new Spark shuffle manager which reads and writes shuffle data to Apache Parquet files. Parquet has a File interface (not a streaming interface) because it is column-oriented and seeks in a File for metadata information, e.g. schemas, statistics. As such, this implementation fetches remote data to local, temporary blocks before the data is passed to Parquet for reading. This managers uses the following spark configuration parameters to configure Parquet: spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}. There is a spark.shuffle.parquet.fallback configuration option which allows users to specify a fallback shuffle manager. If the Parquet manager finds that the classes being shuffled have no schema information, and therefore can't be used, it will fallback to the specified fallback manager. With this PR, only Avro IndexedRecords are supported in the Parquet shuffle; however, it is straight-forward to extend this to other serialization systems that Parquet supports, e.g. Apache Thrift. If there is no spark.shuffle.parquet.fallback defined, any shuffle objects which are not compatible with Parquet will cause an error to be thrown which lists the incompatible objects. Because the ShuffleDependency forwards the key, value and combined class information, a full schema can be generated before the first read/write. This allows for less errors (since reflection isn't used) and makes support for null values possible without complex code. The ExternalSorter, if needed, is setup to not spill to disk if Parquet is used. In the future, an ExternalSorter would need to be created that can read/write Parquet. Only record-level metrics are supported at this time. Byte-level metrics are not currently supported and are complicated somewhat by column compression. --- core/pom.xml | 8 + .../scala/org/apache/spark/SparkEnv.scala | 15 +- .../shuffle/parquet/ErrorShuffleManager.scala | 85 +++++++ .../parquet/ParquetShuffleConfig.scala | 128 ++++++++++ .../parquet/ParquetShuffleManager.scala | 170 +++++++++++++ .../parquet/ParquetShuffleReader.scala | 122 +++++++++ .../parquet/ParquetShuffleWriter.scala | 107 ++++++++ .../spark/shuffle/parquet/avro/AvroPair.scala | 83 ++++++ .../shuffle/parquet/avro/AvroTestEntity.java | 240 ++++++++++++++++++ .../spark/shuffle/parquet/avro/tests.avdl | 29 +++ .../scala/org/apache/spark/ShuffleSuite.scala | 6 +- .../shuffle/parquet/ParquetShuffleSuite.scala | 111 ++++++++ docs/configuration.md | 12 +- pom.xml | 2 +- 14 files changed, 1102 insertions(+), 16 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala create mode 100644 core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java create mode 100644 core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl create mode 100644 core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala diff --git a/core/pom.xml b/core/pom.xml index a46292c13bcc0..8fac5eb62fbc6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -51,6 +51,14 @@ com.twitter chill-java + + org.apache.parquet + parquet-avro + + + org.apache.parquet + parquet-hadoop + org.apache.hadoop hadoop-client diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c6fef7f91f00c..95de09785ce16 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -155,6 +155,13 @@ class SparkEnv ( object SparkEnv extends Logging { @volatile private var env: SparkEnv = _ + // Let the user specify short names for shuffle managers + val shuffleManagerAliases = Map( + "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", + "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", + "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager", + "parquet" -> "org.apache.spark.shuffle.parquet.ParquetShuffleManager") + private[spark] val driverActorSystemName = "sparkDriver" private[spark] val executorActorSystemName = "sparkExecutor" @@ -314,13 +321,9 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - // Let the user specify short names for shuffle managers - val shortShuffleMgrNames = Map( - "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", - "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", - "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") - val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) + val shuffleMgrClass = shuffleManagerAliases + .getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores) diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala new file mode 100644 index 0000000000000..437f9aae9b680 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle._ +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.{ShuffleDependency, TaskContext} + +class ErrorShuffleManager extends ShuffleManager { + + private def throwError(error: String) = { + throw new NotImplementedError( + s"${ParquetShuffleConfig.fallbackShuffleManager} not defined: ${error}") + } + + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { + throwError(s"Unable to register shuffle for keyClass=${dependency.keyClassName} " + + s"valueClass=${dependency.valueClassName} combineClass=${dependency.combinerClassName}") + } + + /** + * Return a resolver capable of retrieving shuffle block data based on block coordinates. + */ + override def shuffleBlockResolver: ShuffleBlockResolver = new ShuffleBlockResolver { + + override def stop(): Unit = {} // no-op + + /** + * Retrieve the data for the specified block. If the data for that block is not available, + * throws an unspecified exception. + */ + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + throwError(s"Unable to get block data for ${blockId}") + } + } + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = {} // no-op + + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + override def unregisterShuffle(shuffleId: Int): Boolean = { + throwError("Unable to unregister shuffle") + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, + mapId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + throwError("Unable to get a writer") + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C](handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { + throwError("Unable to get a reader") + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala new file mode 100644 index 0000000000000..e999d3363471e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName + +import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkEnv} + +object ParquetShuffleConfig { + private val sparkManagerConfKey = "spark.shuffle.manager" + private val parquetManagerAlias = "parquet" + private val namespace = "spark.shuffle.parquet." + private val compressionKey = namespace + "compression" + private val blocksizeKey = namespace + "blocksize" + private val pagesizeKey = namespace + "pagesize" + private val enableDictionaryKey = namespace + "enabledictionary" + private[parquet] val fallbackShuffleManager = namespace + "fallback" + + def isParquetShuffleEnabled: Boolean = { + isParquetShuffleEnabled(SparkEnv.get.conf) + } + + def isParquetShuffleEnabled(conf: SparkConf): Boolean = { + val confValue = conf.get(sparkManagerConfKey, "") + confValue == parquetManagerAlias || confValue == classOf[ParquetShuffleManager].getName + } + + def enableParquetShuffle(): Unit = { + enableParquetShuffle(SparkEnv.get.conf) + } + + def enableParquetShuffle(conf: SparkConf): Unit = { + conf.set(ParquetShuffleConfig.sparkManagerConfKey, classOf[ParquetShuffleManager].getName) + } + + def getCompression: CompressionCodecName = { + getCompression(SparkEnv.get.conf) + } + + def getCompression(conf: SparkConf): CompressionCodecName = { + val confValue = conf.get(compressionKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME + } else { + CompressionCodecName.fromConf(confValue) + } + } + + def getBlockSize: Int = { + getBlockSize(SparkEnv.get.conf) + } + + def getBlockSize(conf: SparkConf): Int = { + val confValue = conf.get(blocksizeKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_BLOCK_SIZE + } else { + confValue.toInt + } + } + + def getPageSize: Int = { + getPageSize(SparkEnv.get.conf) + } + + def getPageSize(conf: SparkConf): Int = { + val confValue = conf.get(pagesizeKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_PAGE_SIZE + } else { + confValue.toInt + } + } + + def isDictionaryEnabled: Boolean = { + isDictionaryEnabled(SparkEnv.get.conf) + } + + def isDictionaryEnabled(conf: SparkConf): Boolean = { + val confValue = conf.get(enableDictionaryKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED + } else { + confValue.toBoolean + } + } + + def setFallbackShuffleManager(managerName: String): Unit = { + setFallbackShuffleManager(SparkEnv.get.conf, managerName) + } + + def setFallbackShuffleManager(conf: SparkConf, managerName: String): Unit = { + conf.set(fallbackShuffleManager, managerName) + } + + def getFallbackShuffleManager: ShuffleManager = { + getFallbackShuffleManager(SparkEnv.get.conf) + } + + def getFallbackShuffleManager(conf: SparkConf): ShuffleManager = { + val confValue = conf.get(fallbackShuffleManager, null) + if (confValue == null) { + new ErrorShuffleManager + } else { + val fullName = SparkEnv.shuffleManagerAliases.getOrElse(confValue, confValue) + val cls = Utils.classForName(fullName) + cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[ShuffleManager] + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala new file mode 100644 index 0000000000000..3e216cbe27e36 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap + +import scala.util.{Success, Try} + +import org.apache.avro.Schema +import org.apache.avro.generic.IndexedRecord + +import org.apache.spark._ +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle._ +import org.apache.spark.shuffle.parquet.avro.AvroPair +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.util.Utils + +// Returned on shuffle registration, contains schema information for readers/writers +private[spark] class ParquetShuffleHandle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C], + val avroPairSchema: String) + extends BaseShuffleHandle(shuffleId, numMaps, dependency) + +private[spark] object ParquetShuffleManager extends Logging { + + def parquetShuffleCanBeUsed[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): Option[Schema] = { + + def getSchema(className: String): Option[Schema] = { + Try(Utils.classForName(className).newInstance().asInstanceOf[IndexedRecord]) match { + case Success(indexedRecord) => Some(indexedRecord.getSchema) + case _ => None + } + } + + getSchema(dependency.keyClassName) match { + case None => + // Can't use Parquet, the key class has no schema + None + case Some(keySchema) => + if (dependency.mapSideCombine) { + dependency.aggregator match { + case None => + throw new AssertionError("Map-Side combine requested but no aggregator defined!") + case Some(aggregator) => + dependency.combinerClassName.map(getSchema) + .map(schema => AvroPair.makePairSchema(keySchema, schema.get)) + } + } else { + // We are *not* doing a map-side combine + getSchema(dependency.valueClassName) match { + case None => + // We can't use Parquet, the value class has no schema + None + case Some(valueSchema) => + // Parquet shuffle files will contain key and value class pairs + Some(AvroPair.makePairSchema(keySchema, valueSchema)) + } + } + } + } +} + +private[spark] class ParquetShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { + private val fileShuffleBlockManager = new FileShuffleBlockResolver(conf) + private val fallbackManager = ParquetShuffleConfig.getFallbackShuffleManager(conf) + private val fallbackShuffleIds = + Collections.newSetFromMap(new ConcurrentHashMap[Int, java.lang.Boolean]()) + private val delegatingShuffleBlockResolver = new ShuffleBlockResolver { + + override def stop(): Unit = { + fallbackManager.shuffleBlockResolver.stop() + fileShuffleBlockManager.stop() + } + + /** + * Retrieve the data for the specified block. If the data for that block is not available, + * throws an unspecified exception. + */ + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + if (fallbackShuffleIds.contains(blockId.shuffleId)) { + fallbackManager.shuffleBlockResolver.getBlockData(blockId) + } else { + fileShuffleBlockManager.getBlockData(blockId) + } + } + } + + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { + // If Parquet is supported for this shuffle, use it + ParquetShuffleManager.parquetShuffleCanBeUsed(shuffleId, numMaps, dependency) match { + case Some(schema) => + new ParquetShuffleHandle(shuffleId, numMaps, dependency, schema.toString) + case _ => + // ... otherwise, use the fallback shuffle manager + fallbackShuffleIds.add(shuffleId) + fallbackManager.registerShuffle(shuffleId, numMaps, dependency) + } + } + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = delegatingShuffleBlockResolver.stop() + + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + override def unregisterShuffle(shuffleId: Int): Boolean = { + if (fallbackShuffleIds.remove(shuffleId)) { + // Notify the fallback shuffle manager, if it was used for this shuffle + fallbackManager.unregisterShuffle(shuffleId) + } else { + // Otherwise, remove it from the Parquet block resolver + fileShuffleBlockManager.removeShuffle(shuffleId) + } + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, + mapId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + handle match { + case parquetHandle: ParquetShuffleHandle[K, V, _] => + new ParquetShuffleWriter[K, V](fileShuffleBlockManager, parquetHandle, mapId, context) + case _ => + fallbackManager.getWriter(handle, mapId, context) + } + } + + override def shuffleBlockResolver: ShuffleBlockResolver = delegatingShuffleBlockResolver + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C](handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { + handle match { + case parquetHandle: ParquetShuffleHandle[K, _, C] => + new ParquetShuffleReader(parquetHandle, startPartition, endPartition, context) + case _ => + fallbackManager.getReader(handle, startPartition, endPartition, context) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala new file mode 100644 index 0000000000000..685be3deeff5c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.nio.file.Files + +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader + +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.parquet.avro.AvroPair + +import org.apache.spark.shuffle.ShuffleReader +import org.apache.spark.storage.{ShuffleBlockFetcherIterator, BlockManager} +import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark._ + +class ParquetShuffleReader[K, V, C]( + handle: ParquetShuffleHandle[K, _, C], + startPartition: Int, + endPartition: Int, + context: TaskContext, + blockManager: BlockManager = SparkEnv.get.blockManager, + mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) + extends ShuffleReader[K, C] with Logging { + require(endPartition == startPartition + 1, + "Parquet shuffle currently only supports fetching one partition") + + private val dep = handle.dependency + private val shuffleId = handle.shuffleId + private val reduceId = startPartition + + /** Read the combined key-values for this reduce task */ + override def read(): Iterator[Product2[K, C]] = { + val blockStreams = new ShuffleBlockFetcherIterator( + context, + blockManager.shuffleClient, + blockManager, + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition), + // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility + SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) + + val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() + + val recordIterator = CompletionIterator[Product2[Any, Any], + Iterator[Product2[Any, Any]]]( + for ((blockId, inputStream) <- blockStreams; + record <- { + // Parquet needs to work with Files instead of InputStreams, so we + // (1) Request a local, temporary block to write the remote data to + val (tempBlockId, tempBlock) = blockManager.diskBlockManager.createTempLocalBlock() + // (2) Copy all data from the InputStream to the local, temporary block File. + Files.copy(inputStream, tempBlock.toPath) + // (3) Close the InputStream, and + inputStream.close() + // (4) Read the Parquet records from the local temporary block File + val reader = AvroParquetReader.builder[AvroPair[K, Any]]( + new Path(tempBlock.getCanonicalPath)) + .build() + val iterator = Iterator.continually(reader.read()).takeWhile(_ != null) + CompletionIterator[Product2[Any, Any], Iterator[Product2[Any, Any]]](iterator, { + reader.close() + tempBlock.delete() + }) + }) yield { + // Update the read metrics for each record that is read + readMetrics.incRecordsRead(1) + record + }, + // When the iterator completes, update all the shuffle metrics + context.taskMetrics().updateShuffleReadMetrics()) + + // An interruptible iterator must be used here in order to support task cancellation + val interruptibleIter = new InterruptibleIterator[Product2[Any, Any]](context, recordIterator) + + val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { + if (dep.mapSideCombine) { + // We are reading values that are already combined + val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] + dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) + } else { + // We don't know the value type, but also don't care -- the dependency *should* + // have made sure its compatible w/ this aggregator, which will convert the value + // type to the combined type C + val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] + dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) + } + } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") + interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] + } + + // Sort the output if there is a sort ordering defined. + dep.keyOrdering match { + case Some(keyOrd: Ordering[K]) => + // TODO: Create a sorter that can spill to Parquet files + val ser = Serializer.getSerializer(dep.serializer) + val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) + sorter.insertAll(aggregatedIter) + context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) + sorter.iterator + case None => + aggregatedIter + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala new file mode 100644 index 0000000000000..1c6697748e69a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.io.File + +import scala.util.{Failure, Success, Try} + +import org.apache.avro.Schema +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter + +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.parquet.avro.AvroPair +import org.apache.spark.shuffle.{FileShuffleBlockResolver, ShuffleWriter} +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.{Logging, SparkEnv, TaskContext} + +case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]]) + +class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver, + handle: ParquetShuffleHandle[K, V, _], + mapId: Int, + context: TaskContext) extends ShuffleWriter[K, V] with Logging { + private val dep = handle.dependency + private val numOutputSplits = dep.partitioner.numPartitions + private val blockManager = SparkEnv.get.blockManager + private var stopping = false + + private val metrics = context.taskMetrics() + private val writeMetrics = new ShuffleWriteMetrics() + metrics.shuffleWriteMetrics = Some(writeMetrics) + + // Parse the serialized avro schema (json) into an Avro Schema object + private val avroSchema = new Schema.Parser().parse(handle.avroPairSchema) + + private val ser = Serializer.getSerializer(dep.serializer.orNull) + private val writers = Array.tabulate[AvroFileWriter[K]](numOutputSplits) { + bucketId => + val blockId = ShuffleBlockId(dep.shuffleId, mapId, bucketId) + val outputFile = blockManager.diskBlockManager.getFile(blockId) + val outputPath = new Path(outputFile.getCanonicalPath) + AvroFileWriter(outputFile, + new AvroParquetWriter[AvroPair[K, Any]](outputPath, + avroSchema, ParquetShuffleConfig.getCompression, ParquetShuffleConfig.getBlockSize, + ParquetShuffleConfig.getPageSize, ParquetShuffleConfig.isDictionaryEnabled)) + } + + /** Write a bunch of records to this task's output */ + override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { + val iter = if (dep.mapSideCombine) { + dep.aggregator match { + case None => + throw new AssertionError("Map-size combine requested with an aggregator") + case Some(aggregator) => + aggregator.combineValuesByKey(records, context) + } + } else { + records + } + + for (elem <- iter) { + val bucketId = dep.partitioner.getPartition(elem._1) + writers(bucketId).writer.write( + new AvroPair[K, Any](elem._1, elem._2, avroSchema)) + writeMetrics.incShuffleRecordsWritten(1) + } + } + + /** Close this writer, passing along whether the map completed */ + override def stop(initiallySuccess: Boolean): Option[MapStatus] = { + var success = initiallySuccess + stopping match { + case true => None + case false => + stopping = true + val status = Try(writers.map { avro: AvroFileWriter[K] => + avro.writer.close() + val bytesWritten = avro.file.length() + writeMetrics.incShuffleBytesWritten(bytesWritten) + bytesWritten + }) + status match { + case Success(sizes) => + Some(MapStatus(blockManager.shuffleServerId, sizes)) + case f: Failure[Array[Long]] => + throw f.exception + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala new file mode 100644 index 0000000000000..792c1ba90e2db --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet.avro + +import scala.collection.JavaConverters._ + +import org.apache.avro.Schema +import org.apache.avro.Schema.Field +import org.apache.avro.generic.IndexedRecord +import org.apache.avro.specific.SpecificData.SchemaConstructable + +/** + * Helper class for wrapping two Avro objects inside a key-value object + */ +object AvroPair { + private val PAIR: String = classOf[AvroPair[_, _]].getName + private val KEY: String = "key" + private val VALUE: String = "value" + private val NULL_SCHEMA = Schema.create(Schema.Type.NULL) + + def checkIsPairSchema(schema: Schema): Boolean = PAIR == schema.getFullName + + /** + * Creates a pair schema with the key and value fields being optional to + * support null values + * @param keySchema The Avro schema for the key + * @param valueSchema The Avro schema for the value + * @return The combined pair schema + */ + def makePairSchema(keySchema: Schema, valueSchema: Schema): Schema = { + val pair: Schema = Schema.createRecord(PAIR, null, null, false) + pair.setFields(List( + new Schema.Field(KEY, Schema.createUnion(List(NULL_SCHEMA, keySchema).asJava), "", null), + new Schema.Field(VALUE, Schema.createUnion(List(NULL_SCHEMA, keySchema).asJava), "", null, + Field.Order.IGNORE)).asJava) + pair + } +} + +class AvroPair[K, V](var _1: K, var _2: V, schema: Schema) + extends IndexedRecord with Product2[K, V] with SchemaConstructable { + assert(AvroPair.checkIsPairSchema(schema), + "AvroPair can only be created with a pair schema") + + // ctor for SchemaConstructable + def this(schema: Schema) = this(null.asInstanceOf[K], null.asInstanceOf[V], schema) + + def update(key: K, value: V): AvroPair[K, V] = { + this._1 = key + this._2 = value + this + } + + override def get(i: Int): AnyRef = i match { + case 0 => _1.asInstanceOf[AnyRef] + case 1 => _2.asInstanceOf[AnyRef] + case _ => new IndexOutOfBoundsException(i.toString) + } + + override def put(i: Int, v: scala.Any): Unit = i match { + case 0 => _1 = v.asInstanceOf[K] + case 1 => _2 = v.asInstanceOf[V] + case _ => new IndexOutOfBoundsException(i.toString) + } + + override def getSchema: Schema = schema + + override def canEqual(that: Any): Boolean = that.isInstanceOf[AvroPair[_, _]] +} diff --git a/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java b/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java new file mode 100644 index 0000000000000..7d880335ba99c --- /dev/null +++ b/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.spark.shuffle.parquet.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class AvroTestEntity extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 6618460632626642454L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroTestEntity\",\"namespace\":\"org.apache.spark.shuffle.parquet.avro\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"string\"]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.lang.CharSequence a; + @Deprecated public java.lang.Integer b; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public AvroTestEntity() {} + + /** + * All-args constructor. + */ + public AvroTestEntity(java.lang.CharSequence a, java.lang.Integer b) { + this.a = a; + this.b = b; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return a; + case 1: return b; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: a = (java.lang.CharSequence)value$; break; + case 1: b = (java.lang.Integer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'a' field. + */ + public java.lang.CharSequence getA() { + return a; + } + + /** + * Sets the value of the 'a' field. + * @param value the value to set. + */ + public void setA(java.lang.CharSequence value) { + this.a = value; + } + + /** + * Gets the value of the 'b' field. + */ + public java.lang.Integer getB() { + return b; + } + + /** + * Sets the value of the 'b' field. + * @param value the value to set. + */ + public void setB(java.lang.Integer value) { + this.b = value; + } + + /** Creates a new AvroTestEntity RecordBuilder */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder() { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(); + } + + /** Creates a new AvroTestEntity RecordBuilder by copying an existing Builder */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder other) { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(other); + } + + /** Creates a new AvroTestEntity RecordBuilder by copying an existing AvroTestEntity instance */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity other) { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(other); + } + + /** + * RecordBuilder for AvroTestEntity instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.lang.CharSequence a; + private java.lang.Integer b; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder other) { + super(other); + if (isValidValue(fields()[0], other.a)) { + this.a = data().deepCopy(fields()[0].schema(), other.a); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.b)) { + this.b = data().deepCopy(fields()[1].schema(), other.b); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing AvroTestEntity instance */ + private Builder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity other) { + super(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.SCHEMA$); + if (isValidValue(fields()[0], other.a)) { + this.a = data().deepCopy(fields()[0].schema(), other.a); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.b)) { + this.b = data().deepCopy(fields()[1].schema(), other.b); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'a' field. + */ + public java.lang.CharSequence getA() { + return a; + } + + /** + * Sets the value of the 'a' field. + * @param value the value to set. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder setA(java.lang.CharSequence value) { + validate(fields()[0], value); + this.a = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'a' field has been set. + */ + public boolean hasA() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'a' field. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder clearA() { + a = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'b' field. + */ + public java.lang.Integer getB() { + return b; + } + + /** + * Sets the value of the 'b' field. + * @param value the value to set. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder setB(java.lang.Integer value) { + validate(fields()[1], value); + this.b = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'b' field has been set. + */ + public boolean hasB() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'b' field. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder clearB() { + b = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + public AvroTestEntity build() { + try { + AvroTestEntity record = new AvroTestEntity(); + record.a = fieldSetFlags()[0] ? this.a : (java.lang.CharSequence) defaultValue(fields()[0]); + record.b = fieldSetFlags()[1] ? this.b : (java.lang.Integer) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + private static final org.apache.avro.io.DatumWriter + WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$); + + private static final org.apache.avro.io.DatumReader + READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$); + +} diff --git a/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl b/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl new file mode 100644 index 0000000000000..e04526774d20f --- /dev/null +++ b/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// avrotools idl tests.avdl > tests.json +// avrotools compile protocol tests.json core/src/test/java/ + +@namespace("org.apache.spark.shuffle.parquet.avro") +protocol AvroParquetTest { + +record AvroTestEntity { + union {null, string} a; + union {null, int} b; +} + +} diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index d91b799ecfc08..491a874f1cb8e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -296,7 +296,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.recordsRead === numRecords) assert(metrics.recordsWritten === numRecords) - assert(metrics.bytesWritten === metrics.byresRead) + assert(metrics.bytesWritten === metrics.bytesRead) assert(metrics.bytesWritten > 0) } @@ -312,7 +312,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.recordsRead === numRecords) assert(metrics.recordsWritten === numRecords) - assert(metrics.bytesWritten === metrics.byresRead) + assert(metrics.bytesWritten === metrics.bytesRead) assert(metrics.bytesWritten > 0) } } @@ -333,7 +333,7 @@ object ShuffleSuite { recordsWritten: Long, recordsRead: Long, bytesWritten: Long, - byresRead: Long) + bytesRead: Long) def runAndReturnMetrics(sc: SparkContext)(job: => Unit): AggregatedShuffleMetrics = { @volatile var recordsWritten: Long = 0 diff --git a/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala new file mode 100644 index 0000000000000..2036349437267 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.parquet + +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.parquet.avro.AvroTestEntity +import org.apache.spark._ + +class ParquetShuffleSuite extends SparkFunSuite with LocalSparkContext { + + def newConf(withFallback: Boolean = false): SparkConf = { + val conf = new SparkConf() + ParquetShuffleConfig.enableParquetShuffle(conf) + if(withFallback) { + ParquetShuffleConfig.setFallbackShuffleManager(conf, "sort") + } + conf.set("spark.serializer", classOf[KryoSerializer].getName) + } + + val fallbackConf = newConf(withFallback = true) + val noFallbackConf = newConf(withFallback = false) + + test("fallback shuffle without aggregation") { + sc = new SparkContext("local", "test", fallbackConf) + val numRecords = 10000 + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(1 to numRecords, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("fallback for shuffle with aggregation") { + sc = new SparkContext("local", "test", fallbackConf) + val numRecords = 10000 + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(1 to numRecords, 4) + .flatMap(key => Array.fill(100)((key, 1))) + .countByKey() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("shuffle without aggregation") { + sc = new SparkContext("local", "test", noFallbackConf) + val numRecords = 10000 + val records = for (i <- 1 to numRecords) yield { + val obj = AvroTestEntity.newBuilder().setA("test").setB(i).build() + (obj, if (i % 10 == 0) null else obj) + } + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(records, 4) + .groupByKey() + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("shuffle with aggregation") { + sc = new SparkContext("local", "test", noFallbackConf) + val numRecords = 10000 + val records = for (i <- 1 to numRecords) yield { + val obj = AvroTestEntity.newBuilder().setA("agg").setB(i).build() + (obj, if (i % 10 == 0) null else obj) + } + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(records, 4) + .reduceByKey({(a, b) => AvroTestEntity.newBuilder().setA("agg").build()}) + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + +} diff --git a/docs/configuration.md b/docs/configuration.md index 1a701f18881fe..9d29ac5dba21e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -447,12 +447,12 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.manager sort - Implementation to use for shuffling data. There are three implementations available: - sort, hash and the new (1.5+) tungsten-sort. - Sort-based shuffle is more memory-efficient and is the default option starting in 1.2. - Tungsten-sort is similar to the sort based shuffle, with a direct binary cache-friendly - implementation with a fall back to regular sort based shuffle if its requirements are not - met. + Implementation to use for shuffling data. There are four implementations available: + sort, hash, the new (1.5+) tungsten-sort, and + parquet. Sort-based shuffle is more memory-efficient and is the default + option starting in 1.2. Tungsten-sort is similar to the sort based shuffle, with a direct + binary cache-friendly implementation with a fall back to regular sort based shuffle if + its requirements are not met. diff --git a/pom.xml b/pom.xml index 88ebceca769e9..afe5b56089e19 100644 --- a/pom.xml +++ b/pom.xml @@ -1601,7 +1601,7 @@ org.apache.parquet parquet-avro ${parquet.version} - ${parquet.test.deps.scope} + ${parquet.deps.scope} com.twitter