From fbcc88bfb0d2fb6dbaae9664d6f0852b71e64f2b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 10 May 2018 17:18:16 +0800 Subject: [PATCH 1/6] commit for continuous map output tracker --- .../org/apache/spark/MapOutputTracker.scala | 35 +++++++++++++++++-- .../scala/org/apache/spark/SparkEnv.scala | 3 ++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 73646051f264c..33da90cc74ffd 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -213,6 +213,8 @@ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage +private[spark] case class CheckNoMissingPartitions(shuffleId: Int) + extends MapOutputTrackerMessage private[spark] case class GetMapOutputMessage(shuffleId: Int, context: RpcCallContext) @@ -233,6 +235,14 @@ private[spark] class MapOutputTrackerMasterEndpoint( logInfo("MapOutputTrackerMasterEndpoint stopped!") context.reply(true) stop() + + case CheckNoMissingPartitions(shuffleId: Int) => + logInfo("") + if (tracker.findMissingPartitions(shuffleId).isEmpty) { + context.reply(true) + } else { + context.reply(false) + } } } @@ -691,7 +701,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr * * (It would be nice to remove this restriction in the future.) */ - private def getStatuses(shuffleId: Int): Array[MapStatus] = { + def getStatuses(shuffleId: Int): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") @@ -747,7 +757,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } - /** Unregister shuffle data. */ def unregisterShuffle(shuffleId: Int): Unit = { mapStatuses.remove(shuffleId) @@ -769,6 +778,28 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } +/** + * MapOutputTrackerWorker for continuous processing, its mainly difference with MapOutputTracker + * is waiting for a time when the upstream's map output status not ready. + */ +private[spark] class ContinuousProcessingMapOutputTrackerWorker(conf: SparkConf) + extends MapOutputTrackerWorker(conf) { + /** + * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize + * on this array when reading it, because on the driver, we may be changing it in place. + * + * (It would be nice to remove this restriction in the future.) + */ + override def getStatuses(shuffleId: Int): Array[MapStatus] = { + while (!askTracker[Boolean](CheckNoMissingPartitions(shuffleId))) { + synchronized { + this.wait(conf.getTimeAsMs("spark.cp.status.retryWait", "1s")) + } + } + super.getStatuses(shuffleId) + } +} + private[spark] object MapOutputTracker extends Logging { val ENDPOINT_NAME = "MapOutputTracker" diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f2232532..59cef2a36e0ca 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -227,6 +227,7 @@ object SparkEnv extends Logging { mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER + val isContinuous = conf.getBoolean("spark.streaming.continuousMode", false) // Listener bus is only used on the driver if (isDriver) { @@ -303,6 +304,8 @@ object SparkEnv extends Logging { val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal) + } else if (isContinuous) { + new ContinuousProcessingMapOutputTrackerWorker(conf) } else { new MapOutputTrackerWorker(conf) } From 44ae9d917c354d780071a8e112a118674865143d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 4 May 2018 11:58:06 +0800 Subject: [PATCH 2/6] INF-SPARK-1382: Continuous shuffle map task implementation and output trackder support --- .../scala/org/apache/spark/Dependency.scala | 24 ++++ .../org/apache/spark/MapOutputTracker.scala | 31 +++- .../scala/org/apache/spark/SparkEnv.scala | 7 +- .../scheduler/ContinuousShuffleMapTask.scala | 135 ++++++++++++++++++ 4 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 9ea6d2fa2fd95..8c9afafc1f890 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -96,6 +96,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) } +/** + * :: DeveloperApi :: + * Represents a dependency on the output of a shuffle stage of continuous type. + */ +@DeveloperApi +class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( + dep: ShuffleDependency[K, V, C], continuousEpoch: Int, totalShuffleNum: Int) + extends ShuffleDependency[K, V, C]( + dep.rdd, + dep.partitioner, + dep.serializer, + dep.keyOrdering, + dep.aggregator, + dep.mapSideCombine) { + + val baseShuffleId: Int = super.shuffleId + + override val shuffleId: Int = continuousEpoch * totalShuffleNum + baseShuffleId + + override val shuffleHandle: ShuffleHandle = dep.rdd.context.env.shuffleManager.registerShuffle( + shuffleId, dep.rdd.partitions.length, this) + + dep.rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) +} /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 33da90cc74ffd..57bb638c19116 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -215,6 +215,10 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int) private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] case class CheckNoMissingPartitions(shuffleId: Int) extends MapOutputTrackerMessage +private[spark] case class CheckAndRegisterShuffle(shuffleId: Int, numMaps: Int) + extends MapOutputTrackerMessage +private[spark] case class RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) + extends MapOutputTrackerMessage private[spark] case class GetMapOutputMessage(shuffleId: Int, context: RpcCallContext) @@ -237,12 +241,24 @@ private[spark] class MapOutputTrackerMasterEndpoint( stop() case CheckNoMissingPartitions(shuffleId: Int) => - logInfo("") + logInfo(s"Checking missing partitions for $shuffleId") if (tracker.findMissingPartitions(shuffleId).isEmpty) { context.reply(true) } else { context.reply(false) } + + case CheckAndRegisterShuffle(shuffleId: Int, numMaps: Int) => + logInfo(s"Trying to register shuffle $shuffleId") + if (!tracker.shuffleStatuses.contains(shuffleId)) { + tracker.registerShuffle(shuffleId, numMaps) + logDebug(s"Shuffle $shuffleId doesn't exist, register it now") + } + context.reply(true) + + case RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) => + tracker.registerMapOutput(shuffleId, mapId, status) + context.reply(true) } } @@ -782,7 +798,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr * MapOutputTrackerWorker for continuous processing, its mainly difference with MapOutputTracker * is waiting for a time when the upstream's map output status not ready. */ -private[spark] class ContinuousProcessingMapOutputTrackerWorker(conf: SparkConf) +private[spark] class ContinuousMapOutputTrackerWorker(conf: SparkConf) extends MapOutputTrackerWorker(conf) { /** * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize @@ -798,6 +814,17 @@ private[spark] class ContinuousProcessingMapOutputTrackerWorker(conf: SparkConf) } super.getStatuses(shuffleId) } + + def checkAndRegisterShuffle(shuffleId: Int, numMaps: Int): Unit = { + // check local cache first to avoid frequency connect to master + if (!mapStatuses.contains(shuffleId)) { + askTracker(CheckAndRegisterShuffle(shuffleId, numMaps)) + } + } + + def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus): Unit = { + askTracker(RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus)) + } } private[spark] object MapOutputTracker extends Logging { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 59cef2a36e0ca..6ff20e7a115e2 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -140,6 +140,7 @@ object SparkEnv extends Logging { private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" + private[spark] val START_EPOCH_KEY = "__continuous_start_epoch" def set(e: SparkEnv) { env = e @@ -305,7 +306,7 @@ object SparkEnv extends Logging { val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else if (isContinuous) { - new ContinuousProcessingMapOutputTrackerWorker(conf) + new ContinuousMapOutputTrackerWorker(conf) } else { new MapOutputTrackerWorker(conf) } @@ -319,7 +320,9 @@ object SparkEnv extends Logging { // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) + "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, + "continuous" -> + classOf[org.apache.spark.sql.execution.streaming.continuous.ContinuousShuffleManager]) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala new file mode 100644 index 0000000000000..836897d1f99db --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.lang.management.ManagementFactory +import java.nio.ByteBuffer +import java.util.Properties + +import scala.language.existentials + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.shuffle.ShuffleWriter + +/** + * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner + * specified in the ShuffleDependency). + * + * See [[org.apache.spark.scheduler.Task]] for more information. + * + * @param stageId id of the stage this task belongs to + * @param stageAttemptId attempt id of the stage this task belongs to + * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, + * the type should be (RDD[_], ShuffleDependency[_, _, _]). + * @param partition partition of the RDD this task is associated with + * @param locs preferred task execution locations for locality scheduling + * @param localProperties copy of thread-local properties set by the user on the driver side. + * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side + * and sent to executor side. + * @param totalShuffleNum total shuffle number for current job. + * + * The parameters below are optional: + * @param jobId id of the job this task belongs to + * @param appId id of the app this task belongs to + * @param appAttemptId attempt id of the app this task belongs to + */ +private[spark] class ContinuousShuffleMapTask( + stageId: Int, + stageAttemptId: Int, + taskBinary: Broadcast[Array[Byte]], + partition: Partition, + @transient private var locs: Seq[TaskLocation], + localProperties: Properties, + serializedTaskMetrics: Array[Byte], + totalShuffleNum: Int, + jobId: Option[Int] = None, + appId: Option[String] = None, + appAttemptId: Option[String] = None) + extends Task[Unit](stageId, stageAttemptId, partition.index, localProperties, + serializedTaskMetrics, jobId, appId, appAttemptId) + with Logging { + + /** A constructor used only in test suites. This does not require passing in an RDD. */ + def this(partitionId: Int) { + this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null) + } + + @transient private val preferredLocs: Seq[TaskLocation] = { + if (locs == null) Nil else locs.toSet.toSeq + } + + // TODO: Get current epoch from epoch coordinator while task restart, also epoch is Long, we + // should deal with it. + var currentEpoch = context.getLocalProperty(SparkEnv.START_EPOCH_KEY).toInt + + override def runTask(context: TaskContext): Unit = { + // Deserialize the RDD using the broadcast variable. + val threadMXBean = ManagementFactory.getThreadMXBean + val deserializeStartTime = System.currentTimeMillis() + val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L + val ser = SparkEnv.get.closureSerializer.newInstance() + // TODO: rdd here should be a wrap of ShuffledRowRDD which never stop + val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( + ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime + _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime + } else 0L + + var writer: ShuffleWriter[Any, Any] = null + val manager = SparkEnv.get.shuffleManager + val mapOutputTracker = SparkEnv.get.mapOutputTracker + .asInstanceOf[ContinuousMapOutputTrackerWorker] + + while (!context.isCompleted() || !context.isInterrupted()) { + try { + // Create a ContinuousShuffleDependency which has new shuffleId based on continuous epoch + val continuousDep = new ContinuousShuffleDependency(dep, currentEpoch, totalShuffleNum) + // Re-register the shuffle TO mapOutputTrackerMaster + mapOutputTracker.checkAndRegisterShuffle(continuousDep.shuffleId, rdd.partitions.length) + writer = manager.getWriter[Any, Any](continuousDep.shuffleHandle, partitionId, context) + writer.write(rdd.iterator(partition, context) + .asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) + val status = writer.stop(success = true).get + // Register map output in task cause the continuous task never success + mapOutputTracker.registerMapOutput(continuousDep.shuffleId, partitionId, status) + currentEpoch += 1 + } catch { + case e: Exception => + try { + if (writer != null) { + writer.stop(success = false) + } + } catch { + case e: Exception => + log.debug("Could not stop writer", e) + } + throw e + } + } + } + + override def preferredLocations: Seq[TaskLocation] = preferredLocs + + override def toString: String = "ContinuousShuffleMapTask(%d, %d)".format(stageId, partitionId) +} From af2d60854856e669f40a03b76fffe02dac7b79c2 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 10 May 2018 21:23:39 +0800 Subject: [PATCH 3/6] Address comments --- .../scala/org/apache/spark/Dependency.scala | 45 +++++++++++++------ .../org/apache/spark/MapOutputTracker.scala | 24 ++++++---- .../scala/org/apache/spark/SparkEnv.scala | 4 +- .../scheduler/ContinuousShuffleMapTask.scala | 12 +++-- .../apache/spark/MapOutputTrackerSuite.scala | 27 +++++++++++ 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 8c9afafc1f890..60120dfdce6dc 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -65,15 +65,17 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) + * @param isContinuous mark the dependency is base for continuous processing or not */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( - @transient private val _rdd: RDD[_ <: Product2[K, V]], + @transient val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false) + val mapSideCombine: Boolean = false, + val isContinuous: Boolean = false) extends Dependency[Product2[K, V]] { if (mapSideCombine) { @@ -88,37 +90,52 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) - val shuffleId: Int = _rdd.context.newShuffleId() + val shuffleId: Int = if (isContinuous) { + // This will not be reset in continuous processing, set an invalid value for now. + Int.MinValue + } else { + _rdd.context.newShuffleId() + } - val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( - shuffleId, _rdd.partitions.length, this) + val shuffleHandle: ShuffleHandle = if (isContinuous) { + null + } else { + _rdd.context.env.shuffleManager.registerShuffle( + shuffleId, _rdd.partitions.length, this) + } - _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + if (!isContinuous) { + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + } } /** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage of continuous type. + * Different with ShuffleDependency, the continuous dependency only create on Executor side, + * so the rdd in param is deserialized from taskBinary. */ @DeveloperApi class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( - dep: ShuffleDependency[K, V, C], continuousEpoch: Int, totalShuffleNum: Int) + rdd: RDD[_ <: Product2[K, V]], + dep: ShuffleDependency[K, V, C], + continuousEpoch: Int, + totalShuffleNum: Int, + shuffleNumMaps: Int) extends ShuffleDependency[K, V, C]( - dep.rdd, + rdd, dep.partitioner, dep.serializer, dep.keyOrdering, dep.aggregator, - dep.mapSideCombine) { + dep.mapSideCombine, true) { - val baseShuffleId: Int = super.shuffleId + val baseShuffleId: Int = dep.shuffleId override val shuffleId: Int = continuousEpoch * totalShuffleNum + baseShuffleId - override val shuffleHandle: ShuffleHandle = dep.rdd.context.env.shuffleManager.registerShuffle( - shuffleId, dep.rdd.partitions.length, this) - - dep.rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + override val shuffleHandle: ShuffleHandle = SparkEnv.get.shuffleManager.registerShuffle( + shuffleId, shuffleNumMaps, this) } /** diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 57bb638c19116..4ffffaf1e32aa 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -242,7 +242,9 @@ private[spark] class MapOutputTrackerMasterEndpoint( case CheckNoMissingPartitions(shuffleId: Int) => logInfo(s"Checking missing partitions for $shuffleId") - if (tracker.findMissingPartitions(shuffleId).isEmpty) { + // If get None from findMissingPartitions, just return a non-empty Seq + val missing = tracker.findMissingPartitions(shuffleId).getOrElse(Seq(0)) + if (missing.isEmpty) { context.reply(true) } else { context.reply(false) @@ -795,17 +797,15 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } /** - * MapOutputTrackerWorker for continuous processing, its mainly difference with MapOutputTracker + * MapOutputTrackerWorker for continuous processing, the main difference with MapOutputTracker * is waiting for a time when the upstream's map output status not ready. */ private[spark] class ContinuousMapOutputTrackerWorker(conf: SparkConf) extends MapOutputTrackerWorker(conf) { /** - * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize - * on this array when reading it, because on the driver, we may be changing it in place. - * - * (It would be nice to remove this restriction in the future.) - */ + * Get or fetch the array of MapStatuses for a given shuffle ID, hold while there's missing + * partition for this shuffleId + */ override def getStatuses(shuffleId: Int): Array[MapStatus] = { while (!askTracker[Boolean](CheckNoMissingPartitions(shuffleId))) { synchronized { @@ -815,15 +815,21 @@ private[spark] class ContinuousMapOutputTrackerWorker(conf: SparkConf) super.getStatuses(shuffleId) } + /** + * Check and register the shuffleId from worker side, try to check it on local cache first. + */ def checkAndRegisterShuffle(shuffleId: Int, numMaps: Int): Unit = { // check local cache first to avoid frequency connect to master if (!mapStatuses.contains(shuffleId)) { - askTracker(CheckAndRegisterShuffle(shuffleId, numMaps)) + askTracker[Boolean](CheckAndRegisterShuffle(shuffleId, numMaps)) } } + /** + * Register current map output from worker side. + */ def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus): Unit = { - askTracker(RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus)) + askTracker[Boolean](RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus)) } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 6ff20e7a115e2..f550d7faa563c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -320,9 +320,7 @@ object SparkEnv extends Logging { // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "continuous" -> - classOf[org.apache.spark.sql.execution.streaming.continuous.ContinuousShuffleManager]) + "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala index 836897d1f99db..0864df96dd263 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ContinuousShuffleMapTask.scala @@ -68,8 +68,9 @@ private[spark] class ContinuousShuffleMapTask( with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ - def this(partitionId: Int) { - this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null) + def this(partitionId: Int, totalShuffleNum: Int) { + this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, + null, totalShuffleNum) } @transient private val preferredLocs: Seq[TaskLocation] = { @@ -103,8 +104,11 @@ private[spark] class ContinuousShuffleMapTask( while (!context.isCompleted() || !context.isInterrupted()) { try { - // Create a ContinuousShuffleDependency which has new shuffleId based on continuous epoch - val continuousDep = new ContinuousShuffleDependency(dep, currentEpoch, totalShuffleNum) + // Create a ContinuousShuffleDependency which has new shuffleId based on continuous epoch. + // Since rdd in the dependency will not be used, set null to avoid compile issues. + val continuousDep = new ContinuousShuffleDependency( + null, dep, currentEpoch, totalShuffleNum, + rdd.partitions.length) // Re-register the shuffle TO mapOutputTrackerMaster mapOutputTracker.checkAndRegisterShuffle(continuousDep.shuffleId, rdd.partitions.length) writer = manager.getWriter[Any, Any](continuousDep.shuffleHandle, partitionId, context) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 21f481d477242..404dd0c77f86b 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -125,6 +125,33 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } + test("continuous tracker worker register shuffle and unregister map output and fetch") { + val hostname = "localhost" + val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + + val masterTracker = newTrackerMaster() + masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) + + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveTracker = new ContinuousMapOutputTrackerWorker(conf) + slaveTracker.trackerEndpoint = + slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + + slaveTracker.checkAndRegisterShuffle(10, 1) + + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + slaveTracker.registerMapOutput(10, 0, MapStatus( + BlockManagerId("a", "hostA", 1000), Array(1000L))) + assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === + Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + + masterTracker.stop() + slaveTracker.stop() + rpcEnv.shutdown() + slaveRpcEnv.shutdown() + } + test("remote fetch") { val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) From 56442dc1c7450518d9bc84b4bfeddb017daa967b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 11 May 2018 20:12:29 +0800 Subject: [PATCH 4/6] Fix mima test --- core/src/main/scala/org/apache/spark/Dependency.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 60120dfdce6dc..597e9c79285b0 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -78,6 +78,12 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val isContinuous: Boolean = false) extends Dependency[Product2[K, V]] { + def this(rdd: RDD[_ <: Product2[K, V]], partitioner: Partitioner, + serializer: Serializer, keyOrdering: Option[Ordering[K]], + aggregator: Option[Aggregator[K, V, C]], mapSideCombine: Boolean) = { + this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine, false) + } + if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } From 2ac9980f30b8b50809aa780035281f6a62ad9573 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 17 May 2018 21:01:33 +0800 Subject: [PATCH 5/6] add inteface to submit all stages of one job in DAGScheduler --- .../org/apache/spark/MapOutputTracker.scala | 38 +++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 76 +++++++++++++++-- .../spark/scheduler/DAGSchedulerSuite.scala | 81 +++++++++++++++++++ .../WriteToContinuousDataSourceExec.scala | 9 ++- 4 files changed, 198 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 73646051f264c..c5900f1513a23 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -769,6 +769,44 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } +/** + * MapOutputTrackerWorker for continuous processing, the main difference with MapOutputTracker + * is waiting for a time when the upstream's map output status not ready. + */ +private[spark] class ContinuousMapOutputTrackerWorker(conf: SparkConf) + extends MapOutputTrackerWorker(conf) { + + /** + * Get or fetch the array of MapStatuses for a given shuffle ID, hold while there's missing + * partition for this shuffleId + */ + override def getStatuses(shuffleId: Int): Array[MapStatus] = { + while (!askTracker[Boolean](CheckNoMissingPartitions(shuffleId))) { + synchronized { + this.wait(conf.getTimeAsMs("spark.cp.status.retryWait", "1s")) + } + } + super.getStatuses(shuffleId) + } + + /** + * Check and register the shuffleId from worker side, try to check it on local cache first. + */ + def checkAndRegisterShuffle(shuffleId: Int, numMaps: Int): Unit = { + // check local cache first to avoid frequency connect to master + if (!mapStatuses.contains(shuffleId)) { + askTracker[Boolean](CheckAndRegisterShuffle(shuffleId, numMaps)) + } + } + + /** + * Register current map output from worker side. + */ + def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus): Unit = { + askTracker[Boolean](RegisterMapOutput(shuffleId: Int, mapId: Int, status: MapStatus)) + } +} + private[spark] object MapOutputTracker extends Logging { val ENDPOINT_NAME = "MapOutputTracker" diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 78b6b34b5d2bb..8314e13b0adf4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -209,6 +209,9 @@ class DAGScheduler( private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + private[scheduler] val isContinuous = + sc.conf.getBoolean("spark.streaming.continuousMode", false) + /** * Called by the TaskSetManager to report task's starting. */ @@ -888,7 +891,18 @@ class DAGScheduler( val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) - submitStage(finalStage) + + if (isContinuous) { + // TODO: make "spark.streaming.totalShuffleNumber" as a val string + val value = properties.getProperty("spark.streaming.totalShuffleNumber") + assert(value != null && value.nonEmpty, "totalShuffleNumber must be set!") + // For continuou processing submit, in this case, we submit all stages at once + // instead of the origin submission in which child stages must be submitted only + // after parent stages complete success + submitAllStagesOnce(finalStage) + } else { + submitStage(finalStage) + } } private[scheduler] def handleMapStageSubmitted(jobId: Int, @@ -957,6 +971,47 @@ class DAGScheduler( } } + /** + * Get all stages and submit them at-once. + * + * @param finalStage the result Stage + * @return the list of all stages of current job, Ascending by stage id + */ + private def submitAllStagesOnce(finalStage: ResultStage) { + val stages = getAllAncestorStages(finalStage) + val jobId = activeJobForStage(finalStage) + logInfo(s"Start submitting ${stages.size} stages for continuous processing, jobId:${jobId}") + for (stage <- stages) { + logInfo("Submitting " + stage + " (" + stage.rdd + ")") + submitMissingTasks(stage, jobId.get) + } + } + + private[scheduler] def getAllAncestorStages(stage: Stage): List[Stage] = { + val result = new HashSet[Stage] + result += stage + val visited = new HashSet[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] + waitingForVisit.push(stage.rdd) + while(waitingForVisit.nonEmpty) { + val rdd = waitingForVisit.pop() + if (!visited(rdd)) { + visited += rdd + for (dep <- rdd.dependencies) { + dep match { + case shufDep: ShuffleDependency[_, _, _] => + val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) + result += mapStage + waitingForVisit.push(shufDep.rdd) + case narrowDep: NarrowDependency[_] => + waitingForVisit.push(narrowDep.rdd) + } + } + } + } // end while + result.toList.sortBy(_.id) + } + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") @@ -1060,9 +1115,17 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id - new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, - taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), - Option(sc.applicationId), sc.applicationAttemptId) + if (isContinuous) { + val totalShuffleNum = + properties.getProperty("spark.streaming.totalShuffleNumber").toInt + new ContinuousShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, + taskBinary, part, locs, properties, serializedTaskMetrics, totalShuffleNum, + Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) + } else { + new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, + taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), + Option(sc.applicationId), sc.applicationAttemptId) + } } case stage: ResultStage => @@ -1087,7 +1150,7 @@ class DAGScheduler( s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) - } else { + } else if (!isContinuous) { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) @@ -1103,6 +1166,9 @@ class DAGScheduler( logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) + } else { + logInfo(s"Submitting tasks's size:${tasks.size}, isContinuous:${isContinuous} " + + s"Neither need mark stage as finished nor need submit waiting child stages") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8b6ec37625eec..ceec7002363a7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -564,6 +564,87 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.runningStages.head.isInstanceOf[ResultStage]) } + test("getAllAncestorStages should return all ancestor stages") { + val rddA = new MyRDD(sc, 1, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + + val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + + val rddC = new MyRDD(sc, 1, List(shuffleDepB), tracker = mapOutputTracker) + val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC))) + + val mapStageA = scheduler.createShuffleMapStage(shuffleDepA, 1) + assert(scheduler.getAllAncestorStages(mapStageA).size === 1) + + val mapStageB = scheduler.createShuffleMapStage(shuffleDepB, 1) + assert(scheduler.getAllAncestorStages(mapStageB).size === 2) + + submit(rddD, Array(0)) + assert(scheduler.activeJobs.size === 1) + + val finalStage = scheduler.activeJobs.head.finalStage + assert(scheduler.getAllAncestorStages(finalStage).size === 3) + } + + /** + * This test ensure in Continuous Processing model submitJob will submit all stages at-once + * It constructs the following chain of dependencies: + * + * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] <-(one)--[E] <-(one)--[F] + * \ / + * <------------- + * Here, RDD B has a shuffle dependency on RDD A, RDD C has shuffle dependency on both + * B and A, RDD D has shuffle dependency on RDD C, RDD E has a one-to-one dependency on RDD D. + * and RDD F has a one-to-one dependency on RDD E. + * + * The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example + * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A, + * s_B, s_C is the same meaning + * + * Note: [] means an RDD, (s_*) means a shuffle dependency, (one) means a one-to-one dependency. + */ + + test("submit All stages in continuous processing") { + // reset the test context + afterEach() + val conf = new SparkConf() + conf.set("spark.streaming.continuousMode", "true") + init(conf) + assert(sc.conf.getBoolean("spark.streaming.continuousMode", false)) + + val rddA = new MyRDD(sc, 1, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + val s_A = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + val s_B = shuffleDepB.shuffleId + + val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker) + val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) + val s_C = shuffleDepC.shuffleId + + val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker) + + // cache the rddE + val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddD))).cache() + val rddF = new MyRDD(sc, 1, List(new OneToOneDependency(rddE))) + + // set cacheLocations of rddE + cacheLocations(rddE.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + val prop = new Properties(); + prop.setProperty(SparkEnv.START_EPOCH_KEY, "0") + prop.setProperty("spark.streaming.totalShuffleNumber", "3") + submit(rddF, Array(0), properties = prop) + assert(scheduler.shuffleIdToMapStage.size === 3) + assert(scheduler.activeJobs.size === 1) + assert(scheduler.runningStages.size === 4) + assert(scheduler.waitingStages.size === 0) + assert(scheduler.failedStages.size === 0) + } + test("avoid exponential blowup when getting preferred locs list") { // Build up a complex dependency graph with repeated zip operations, without preferred locations var rdd: RDD[_] = new MyRDD(sc, 1, Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index e0af3a2f1b85d..4fb1ff4305017 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter @@ -58,7 +59,13 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - rdd.collect() + val totalShuffleNum = query.collect { case s: ShuffleExchangeExec => true }.length + sparkContext.setLocalProperty("spark.streaming.totalShuffleNumber", totalShuffleNum.toString) + sparkContext.runJob( + rdd, + (context: TaskContext, iter: Iterator[InternalRow]) => + WriteToContinuousDataSourceExec.run(writerFactory, context, iter), + rdd.partitions.indices) } catch { case _: InterruptedException => // Interruption is how continuous queries are ended, so accept and ignore the exception. From 0f070fc5b26a3e2d5c30fdd7d59c4dd8896255ac Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 17 May 2018 21:05:29 +0800 Subject: [PATCH 6/6] fix --- .../main/scala/org/apache/spark/SparkEnv.scala | 1 + .../spark/scheduler/DAGSchedulerSuite.scala | 18 +++++++++--------- .../spark/sql/execution/ShuffledRowRDD.scala | 2 ++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f2232532..b648f221db6d8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -140,6 +140,7 @@ object SparkEnv extends Logging { private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" + private[spark] val START_EPOCH_KEY = "__continuous_start_epoch" def set(e: SparkEnv) { env = e diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ceec7002363a7..c525c4ee5fb8d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -579,15 +579,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val mapStageB = scheduler.createShuffleMapStage(shuffleDepB, 1) assert(scheduler.getAllAncestorStages(mapStageB).size === 2) - - submit(rddD, Array(0)) - assert(scheduler.activeJobs.size === 1) + + submit(rddD, Array(0)) + assert(scheduler.activeJobs.size === 1) val finalStage = scheduler.activeJobs.head.finalStage assert(scheduler.getAllAncestorStages(finalStage).size === 3) } - /** + /** * This test ensure in Continuous Processing model submitJob will submit all stages at-once * It constructs the following chain of dependencies: * @@ -595,12 +595,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi * \ / * <------------- * Here, RDD B has a shuffle dependency on RDD A, RDD C has shuffle dependency on both - * B and A, RDD D has shuffle dependency on RDD C, RDD E has a one-to-one dependency on RDD D. + * B and A, RDD D has shuffle dependency on RDD C, RDD E has a one-to-one dependency on RDD D. * and RDD F has a one-to-one dependency on RDD E. - * + * * The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A, - * s_B, s_C is the same meaning + * s_B, s_C is the same meaning. * * Note: [] means an RDD, (s_*) means a shuffle dependency, (one) means a one-to-one dependency. */ @@ -637,10 +637,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val prop = new Properties(); prop.setProperty(SparkEnv.START_EPOCH_KEY, "0") prop.setProperty("spark.streaming.totalShuffleNumber", "3") - submit(rddF, Array(0), properties = prop) + submit(rddF, Array(0), properties = prop) assert(scheduler.shuffleIdToMapStage.size === 3) assert(scheduler.activeJobs.size === 1) - assert(scheduler.runningStages.size === 4) + assert(scheduler.runningStages.size === 4) assert(scheduler.waitingStages.size === 0) assert(scheduler.failedStages.size === 0) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 862ee05392f37..235ec8bda2cfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -33,6 +33,8 @@ private final class ShuffledRowRDDPartition( val startPreShufflePartitionIndex: Int, val endPreShufflePartitionIndex: Int) extends Partition { override val index: Int = postShufflePartitionIndex + + var lastEpoch : Int = Int.MinValue } /**