Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Coroutines extensions (#1169)
* First drop that translates an ApolloCall to a Deferred<>. This will most likely not work for calls that returns multiple values (like cache & network) * working channel * first drop for coroutines extensions. The tests are taken from the RxJava helpers. Cancellation is not 100% working at the moment. Calling channel.cancel() will make sure that channel.isClosedForReceive becomes true. But it does not dispose the interceptor chain. * cancel the call/watcher when the channel is cancelled * cancel prefetch as well * remove extra line * capacity parameter, ApolloSubscriptionCall and javadoc * added a comment * fine tune javadoc, extension function do not need the first param :) * added a test for Channel.CONFLATED * add `toDeferred` as a convenience method * only cancel() if the deferred is actually canceled
- Loading branch information
1 parent
b40a676
commit df09d73
Showing
8 changed files
with
413 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/build |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
apply plugin: 'java' | ||
apply plugin: 'kotlin' | ||
|
||
targetCompatibility = JavaVersion.VERSION_1_7 | ||
sourceCompatibility = JavaVersion.VERSION_1_7 | ||
|
||
dependencies { | ||
compileOnly project(":apollo-runtime") | ||
compileOnly project(":apollo-api") | ||
compile dep.kotlinStdLib | ||
compile dep.kotlinxCoroutines | ||
|
||
testCompile dep.junit | ||
testCompile dep.truth | ||
testCompile dep.mockWebServer | ||
testCompile dep.okhttpTestSupport | ||
} | ||
|
||
apply from: rootProject.file('gradle/gradle-mvn-push.gradle') | ||
apply from: rootProject.file('gradle/bintray.gradle') | ||
|
||
javadoc { | ||
options.encoding = 'UTF-8' | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
POM_ARTIFACT_ID=apollo-coroutine-support | ||
POM_NAME=Apollo GraphQL Kotlin coroutines Support | ||
POM_DESCRIPTION=Apollo GraphQL Kotlin coroutines bindings | ||
POM_PACKAGING=jar |
178 changes: 178 additions & 0 deletions
178
...tines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package com.apollographql.apollo.coroutines | ||
|
||
import com.apollographql.apollo.ApolloCall | ||
import com.apollographql.apollo.ApolloPrefetch | ||
import com.apollographql.apollo.ApolloQueryWatcher | ||
import com.apollographql.apollo.ApolloSubscriptionCall | ||
import com.apollographql.apollo.api.Response | ||
import com.apollographql.apollo.exception.ApolloException | ||
import kotlinx.coroutines.* | ||
import kotlinx.coroutines.channels.Channel | ||
import kotlin.coroutines.resume | ||
import kotlin.coroutines.resumeWithException | ||
|
||
private class ChannelCallback<T>(val channel: Channel<Response<T>>) : ApolloCall.Callback<T>() { | ||
|
||
override fun onResponse(response: Response<T>) { | ||
channel.offer(response) | ||
} | ||
|
||
override fun onFailure(e: ApolloException) { | ||
channel.close(e) | ||
} | ||
|
||
override fun onStatusEvent(event: ApolloCall.StatusEvent) { | ||
if (event == ApolloCall.StatusEvent.COMPLETED) { | ||
channel.close() | ||
} | ||
} | ||
} | ||
|
||
private fun checkCapacity(capacity: Int) { | ||
when (capacity) { | ||
Channel.UNLIMITED, | ||
Channel.CONFLATED -> return | ||
else -> | ||
// Everything else than UNLIMITED or CONFLATED does not guarantee that channel.offer() succeeds all the time. | ||
// We don't support these use cases for now | ||
throw IllegalArgumentException("Bad channel capacity ($capacity). Only UNLIMITED and CONFLATED are supported") | ||
} | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.channels.Channel}. The number of values produced | ||
* by the channel is based on the {@link com.apollographql.apollo.fetcher.ResponseFetcher} used with the call. | ||
* | ||
* @param <T> the value type. | ||
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment | ||
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED} | ||
* @return the converted channel | ||
*/ | ||
fun <T> ApolloCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> { | ||
checkCapacity(capacity) | ||
val channel = Channel<Response<T>>(capacity) | ||
|
||
channel.invokeOnClose { | ||
cancel() | ||
} | ||
enqueue(ChannelCallback(channel)) | ||
|
||
return channel | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.Deferred}. This is a convenience method | ||
* that will only return the first value emitted. If the more than one response is required, for an example | ||
* to retrieve cached and network response, use {@link toChannel} instead. | ||
* | ||
* @param <T> the value type. | ||
* @return the deferred | ||
*/ | ||
fun <T> ApolloCall<T>.toDeferred(): Deferred<Response<T>> { | ||
val deferred = CompletableDeferred<Response<T>>() | ||
|
||
deferred.invokeOnCompletion { | ||
if (deferred.isCancelled) { | ||
cancel() | ||
} | ||
} | ||
enqueue(object: ApolloCall.Callback<T>() { | ||
override fun onResponse(response: Response<T>) { | ||
deferred.complete(response) | ||
} | ||
|
||
override fun onFailure(e: ApolloException) { | ||
deferred.completeExceptionally(e) | ||
} | ||
}) | ||
|
||
return deferred | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloQueryWatcher} to an {@link kotlinx.coroutines.channels.Channel}. | ||
* | ||
* @param <T> the value type. | ||
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment | ||
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED} | ||
* @return the converted channel | ||
*/ | ||
fun <T> ApolloQueryWatcher<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> { | ||
checkCapacity(capacity) | ||
val channel = Channel<Response<T>>(capacity) | ||
|
||
channel.invokeOnClose { | ||
cancel() | ||
} | ||
enqueueAndWatch(ChannelCallback(channel)) | ||
|
||
return channel | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloSubscriptionCall} to an {@link kotlinx.coroutines.channels.Channel}. | ||
* | ||
* @param <T> the value type. | ||
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment | ||
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED} | ||
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED} | ||
* @return the converted channel | ||
*/ | ||
fun <T> ApolloSubscriptionCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> { | ||
checkCapacity(capacity) | ||
val channel = Channel<Response<T>>(capacity) | ||
|
||
channel.invokeOnClose { | ||
cancel() | ||
} | ||
execute(object : ApolloSubscriptionCall.Callback<T> { | ||
override fun onResponse(response: Response<T>) { | ||
channel.offer(response) | ||
} | ||
|
||
override fun onFailure(e: ApolloException) { | ||
channel.close(e) | ||
} | ||
|
||
override fun onCompleted() { | ||
channel.close() | ||
} | ||
}) | ||
|
||
return channel | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloPrefetch} to an {@link kotlinx.coroutines.Job}. | ||
* | ||
* @param <T> the value type. | ||
* @return the converted job | ||
*/ | ||
fun ApolloPrefetch.toJob(): Job { | ||
val deferred = CompletableDeferred<Unit>() | ||
|
||
deferred.invokeOnCompletion { | ||
if (deferred.isCancelled) { | ||
cancel() | ||
} | ||
} | ||
|
||
enqueue(object : ApolloPrefetch.Callback() { | ||
override fun onSuccess() { | ||
deferred.complete(Unit) | ||
} | ||
|
||
override fun onFailure(e: ApolloException) { | ||
deferred.completeExceptionally(e) | ||
} | ||
}) | ||
|
||
return deferred | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.