Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API #2760

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java;


import java.util.List;
import java.util.concurrent.Future;

public interface JavaFutureAction<T> extends Future<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it makes sense to expose an extended version of the Java Future API to users, since there may be a number of existing libraries for consuming these standard future types.


/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
List<Integer> jobIds();
}
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
import java.util.Collections
import java.util.concurrent.TimeUnit

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Try}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -69,6 +70,11 @@ trait FutureAction[T] extends Future[T] {
*/
override def isCompleted: Boolean

/**
* Returns whether the action has been cancelled.
*/
def isCancelled: Boolean
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is new; I addd it to try to maintain feature parity between the Java and Scala futures.


/**
* The value of this Future.
*
Expand Down Expand Up @@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

@volatile private var _cancelled: Boolean = false

override def cancel() {
_cancelled = true
jobWaiter.cancel()
}

Expand Down Expand Up @@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}

override def isCompleted: Boolean = jobWaiter.jobFinished

override def isCancelled: Boolean = _cancelled

override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
Expand All @@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down Expand Up @@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
if (!cancelled) {
if (!isCancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
Expand All @@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

/**
* Returns whether the promise has been cancelled.
*/
def cancelled: Boolean = _cancelled
override def isCancelled: Boolean = _cancelled

@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
Expand All @@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
def jobIds = jobs

}

private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
extends JavaFutureAction[T] {

import scala.collection.JavaConverters._

override def isCancelled: Boolean = futureAction.isCancelled

override def isDone: Boolean = {
// According to java.util.Future's Javadoc, this returns True if the task was completed,
// whether that completion was due to successful execution, an exception, or a cancellation.
futureAction.isCancelled || futureAction.isCompleted
}

override def jobIds(): java.util.List[java.lang.Integer] = {
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
}

private def getImpl(timeout: Duration): T = {
// This will throw TimeoutException on timeout:
Await.ready(futureAction, timeout)
futureAction.value.get match {
case scala.util.Success(value) => converter(value)
case Failure(exception) =>
if (isCancelled) {
throw new CancellationException("Job cancelled").initCause(exception)
} else {
// java.util.Future.get() wraps exceptions in ExecutionException
throw new ExecutionException("Exception thrown by job", exception)
}
}
}

override def get(): T = getImpl(Duration.Inf)

override def get(timeout: Long, unit: TimeUnit): T =
getImpl(Duration.fromNanos(unit.toNanos(timeout)))

override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
if (isDone) {
// According to java.util.Future's Javadoc, this should return false if the task is completed.
false
} else {
// We're limited in terms of the semantics we can provide here; our cancellation is
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
futureAction.cancel()
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a purely pedant point of view, it's still possible for two threads to call cancel() and reach here, which means two cancelations would return true and violate the contract. Not a big deal though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the cancel method synchronized to address this.

}
}

}
53 changes: 41 additions & 12 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable, Long => JLong}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
Expand Down Expand Up @@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
val cleanF = rdd.context.clean((x: T) => f.call(x))
rdd.foreach(cleanF)
rdd.foreach(x => f.call(x))
}

/**
Expand Down Expand Up @@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name

/**
* :: Experimental ::
* The asynchronous version of the foreach action.
*
* @param f the function to apply to all the elements of the RDD
* @return a FutureAction for the action
* The asynchronous version of `count`, which returns a
* future for counting the number of elements in this RDD.
*/
@Experimental
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
import org.apache.spark.SparkContext._
rdd.foreachAsync(x => f.call(x))
def countAsync(): JavaFutureAction[JLong] = {
new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, my PR breaks compatibility for this experimental Java API. However, the previous version of this method hasn't been shipped in any Spark releases yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i think this is fine

/**
* The asynchronous version of `collect`, which returns a future for
* retrieving an array containing all of the elements in this RDD.
*/
def collectAsync(): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
}

/**
* The asynchronous version of the `take` action, which returns a
* future for retrieving the first `num` elements of this RDD.
*/
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
}

/**
* The asynchronous version of the `foreach` action, which
* applies a function f to all the elements of this RDD.
*/
def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
{ x => null.asInstanceOf[Void] })
}

/**
* The asynchronous version of the `foreachPartition` action, which
* applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
{ x => null.asInstanceOf[Void] })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
@Experimental
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {

/**
Expand Down
Loading