Skip to content

Commit

Permalink
[squash] minor improvements, better null handling
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Aug 29, 2019
1 parent 0ee4fff commit 8c1b476
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 43 deletions.
64 changes: 28 additions & 36 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Expand Up @@ -5,12 +5,11 @@
package kotlinx.coroutines.guava

import com.google.common.util.concurrent.*
import com.google.common.util.concurrent.internal.InternalFutureFailureAccess
import com.google.common.util.concurrent.internal.InternalFutures
import java.util.concurrent.*
import kotlin.coroutines.*
import com.google.common.util.concurrent.internal.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*

/**
* Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
Expand Down Expand Up @@ -121,13 +120,7 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
if (isDone) {
return try {
val value = Uninterruptibles.getUninterruptibly(this)
if (value == null) {
CompletableDeferred<T>().also {
it.completeExceptionally(KotlinNullPointerException())
}
} else {
CompletableDeferred(value)
}
CompletableDeferred<T>(value as T)
} catch (e: CancellationException) {
CompletableDeferred<T>().also { it.cancel(e) }
} catch (e: ExecutionException) {
Expand All @@ -142,7 +135,8 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
val deferred = CompletableDeferred<T>()
Futures.addCallback(this, object : FutureCallback<T> {
override fun onSuccess(result: T?) {
deferred.complete(result!!)
@Suppress("UNCHECKED_CAST")
deferred.complete(result as T)
}

override fun onFailure(t: Throwable) {
Expand Down Expand Up @@ -237,8 +231,9 @@ public suspend fun <T> ListenableFuture<T>.await(): T {

return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
addListener(
ToContinuation(this, cont),
MoreExecutors.directExecutor())
ToContinuation(this, cont),
MoreExecutors.directExecutor()
)
cont.invokeOnCancellation {
cancel(false)
}
Expand All @@ -253,22 +248,23 @@ public suspend fun <T> ListenableFuture<T>.await(): T {
* [ExecutionException] when thrown.
*/
private class ToContinuation<T>(
val futureToObserve: ListenableFuture<T>,
val continuation: CancellableContinuation<T>
private val futureToObserve: ListenableFuture<T>,
private val continuation: CancellableContinuation<T>
): Runnable {
override fun run() {
if (futureToObserve.isCancelled) {
continuation.cancel()
} else {
try {
continuation.resumeWith(
Result.success(Uninterruptibles.getUninterruptibly(futureToObserve)))
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
continuation.resumeWithException(e.nonNullCause())
}
return
}
try {
continuation.resumeWith(
Result.success(Uninterruptibles.getUninterruptibly(futureToObserve))
)
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
continuation.resumeWithException(e.nonNullCause())
}
}
}
Expand Down Expand Up @@ -339,14 +335,14 @@ private class ListenableFutureCoroutine<T>(
* - Fully correct cancellation and listener happens-after obeying [Future] and
* [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
* The best way to be correct, especially given the fun corner cases from
* [AsyncFuture.setAsync], is to just use an [AsyncFuture].
* - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture]
* [AbstractFuture.setFuture], is to just use an [AbstractFuture].
* - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AbstractFuture]
* around its input [deferred] as a state engine to establish happens-after-completion. This
* could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the
* could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
* cost of the implementation's readability.
*/
private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> {
val innerFuture = DeferredListenableFuture(deferred)
private val innerFuture = DeferredListenableFuture(deferred)

// Adding the listener after initialization resolves partial construction hairpin problem.
//
Expand Down Expand Up @@ -458,11 +454,7 @@ private class DeferredListenableFuture<T>(
* that certain combinations of cancelled/cancelling states can't be observed.
*/
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return if (super.cancel(mayInterruptIfRunning)) {
deferred.cancel()
true
} else {
false
}
deferred.cancel() // Job.cancel is idempotent
return super.cancel(mayInterruptIfRunning)
}
}
Expand Up @@ -427,19 +427,21 @@ class ListenableFutureTest : TestBase() {
@Test
fun testFutureCompletedWithNullAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { null })
val deferred = GlobalScope.async {
future.asDeferred().await()
}

val future = executor.submit(Callable<Int> { null })
try {
deferred.await()
expectUnreached()
future.asDeferred().await()
} catch (e: Throwable) {
assertTrue(e is KotlinNullPointerException)
}
}

@Test
fun testNullableFutureCompletedWithNullAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable<Int?> { null })
assertNull(future.asDeferred().await())
}

@Test
fun testThrowingFutureAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
Expand Down

0 comments on commit 8c1b476

Please sign in to comment.