From 62691543505526f25f0f962c7bbaebe6d04510f5 Mon Sep 17 00:00:00 2001 From: Aplysia Date: Sun, 4 Dec 2016 20:24:17 +0800 Subject: [PATCH] Get a result while having a percent of the tasks succeed --- .../scala/org/apache/spark/SparkContext.scala | 21 +++++ .../spark/partial/PartialActionListener.scala | 87 +++++++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 31 ++++++- 3 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/partial/PartialActionListener.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1261e3e735761..d2decbb0645e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2002,6 +2002,27 @@ class SparkContext(config: SparkConf) extends Logging { result } + /** + * :: DeveloperApi :: + * Run a job that can return a partial result that get from parts of the tasks. + */ + @DeveloperApi + def runPartialJob[T, U, R]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + evaluator: ApproximateEvaluator[U, R], + percent: Double): PartialResult[R] = { + assertNotStopped() + val callSite = getCallSite + logInfo("Starting job: " + callSite.shortForm) + val start = System.nanoTime + val cleanedFunc = clean(func) + val result = dagScheduler.runPartialJob(rdd, cleanedFunc, evaluator, callSite, percent, + localProperties.get) + logInfo( + "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") + result + } /** * Submit a job for execution and return a FutureJob holding the result. */ diff --git a/core/src/main/scala/org/apache/spark/partial/PartialActionListener.scala b/core/src/main/scala/org/apache/spark/partial/PartialActionListener.scala new file mode 100644 index 0000000000000..527a100d97626 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/partial/PartialActionListener.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.partial + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.JobListener + +/** + * A JobListener for an approximate single-result action, such as count() or non-parallel reduce(). + * This listener waits up to an specified percentage tasks' completion and will return a partial + * answer even if the complete answer is not available by then. + * + * This class assumes that the action is performed on an entire RDD[T] via a function that computes + * a result of type U for each partition, and that the action returns a partial or complete result + * of type R. + */ +private[spark] class PartialActionListener[T, U, R]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + evaluator: ApproximateEvaluator[U, R], + percent: Double) + extends JobListener { + + val totalTasks = rdd.partitions.length + val stopBound = math.ceil(totalTasks * percent) + var finishedTasks = 0 + var failure: Option[Exception] = None // Set if the job has failed (permanently) + var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult + + override def taskSucceeded(index: Int, result: Any) { + synchronized { + evaluator.merge(index, result.asInstanceOf[U]) + finishedTasks += 1 + if (finishedTasks == totalTasks) { + // If we had already returned a PartialResult, set its final value + resultObject.foreach(r => r.setFinalValue(evaluator.currentResult())) + // Notify any waiting thread that may have called awaitResult + this.notifyAll() + } + } + } + + override def jobFailed(exception: Exception) { + synchronized { + failure = Some(exception) + this.notifyAll() + } + } + + /** + * Waits for having a percentage of the tasks completed and then returns a + * PartialResult with the result so far. This may be complete if the whole job is done. + */ + def awaitResult(): PartialResult[R] = synchronized { + while (true) { + val time = System.currentTimeMillis() + if (failure.isDefined) { + throw failure.get + } else if (finishedTasks == totalTasks) { + return new PartialResult(evaluator.currentResult(), true) + } else if (finishedTasks >= stopBound) { + resultObject = Some(new PartialResult(evaluator.currentResult(), false)) + return resultObject.get + } else { + this.wait(100) + } + } + // Should never be reached, but required to keep the compiler happy + return null + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7fde34d8974c0..37e777d3d88c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -29,15 +29,13 @@ import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps import scala.util.control.NonFatal - import org.apache.commons.lang3.SerializationUtils - import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils -import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} +import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialActionListener, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ @@ -658,6 +656,33 @@ class DAGScheduler( listener.awaitResult() // Will throw an exception if the job fails } + /** + * Run an Partial job on the given RDD and pass all the results to an ApproximateEvaluator + * as they arrive. Returns a partial result object from the evaluator. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param evaluator [[ApproximateEvaluator]] to receive the partial results + * @param callSite where in the user program this job was called + * @param percent minimum percentage of tasks to wait + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + */ + def runPartialJob[T, U, R]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + evaluator: ApproximateEvaluator[U, R], + callSite: CallSite, + percent: Double, + properties: Properties): PartialResult[R] = { + val listener = new PartialActionListener(rdd, func, evaluator, percent) + val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] + val partitions = (0 until rdd.partitions.length).toArray + val jobId = nextJobId.getAndIncrement() + eventProcessLoop.post(JobSubmitted( + jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties))) + listener.awaitResult() // Will throw an exception if the job fails + } + /** * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter * can be used to block until the job finishes executing or can be used to cancel the job.