Skip to content

Commit

Permalink
Coroutines: add ApolloCall.toFlow() (#1581)
Browse files Browse the repository at this point in the history
* bump kotlin to 1.3.50, coroutines to 1.3.1 and add `ApolloCall.toFlow()`

* optimize imports
  • Loading branch information
martinbonnin authored and sav007 committed Sep 18, 2019
1 parent bde3c4f commit ab63da7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 4 deletions.
Expand Up @@ -8,8 +8,7 @@ import com.apollographql.apollo.api.Response
import com.apollographql.apollo.exception.ApolloException
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.flow.flow

private class ChannelCallback<T>(val channel: Channel<Response<T>>) : ApolloCall.Callback<T>() {

Expand Down Expand Up @@ -39,6 +38,54 @@ private fun checkCapacity(capacity: Int) {
}
}

/**
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.flow.Flow}.
*
* @param <T> the value type.
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
* @return a flow which emits Responses<T>
*/
fun <T> ApolloCall<T>.toFlow(capacity: Int = Channel.UNLIMITED) = flow {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)

enqueue(ChannelCallback(channel = channel))
try {
for (item in channel) {
emit(item)
}
} finally {
cancel()
}
}

/**
* Converts an {@link ApolloQueryWatcher} to an {@link kotlinx.coroutines.flow.Flow}.
*
* @param <T> the value type.
* @param capacity the {@link Capacity} used for the underlying channel. Only {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
* and {@link kotlinx.coroutines.channels.Channel.CONFLATED} are supported at the moment
* @throws IllegalArgumentException if capacity is not {@link kotlinx.coroutines.channels.Channel.UNLIMITED}
* or {@link kotlinx.coroutines.channels.Channel.CONFLATED}
* @return a flow which emits Responses<T>
*/
fun <T> ApolloQueryWatcher<T>.toFlow(capacity: Int = Channel.UNLIMITED) = flow {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)

enqueueAndWatch(ChannelCallback(channel = channel))
try {
for (item in channel) {
emit(item)
}
} finally {
cancel()
}
}

/**
* Converts an {@link ApolloCall} to an {@link kotlinx.coroutines.channels.Channel}. The number of values produced
* by the channel is based on the {@link com.apollographql.apollo.fetcher.ResponseFetcher} used with the call.
Expand Down
Expand Up @@ -2,11 +2,14 @@ package com.apollographql.apollo

import com.apollographql.apollo.Utils.*
import com.apollographql.apollo.api.Input
import com.apollographql.apollo.api.Response
import com.apollographql.apollo.cache.normalized.lru.EvictionPolicy
import com.apollographql.apollo.cache.normalized.lru.LruNormalizedCacheFactory
import com.apollographql.apollo.coroutines.toChannel
import com.apollographql.apollo.coroutines.toDeferred
import com.apollographql.apollo.coroutines.toFlow
import com.apollographql.apollo.coroutines.toJob
import com.apollographql.apollo.exception.ApolloException
import com.apollographql.apollo.exception.ApolloParseException
import com.apollographql.apollo.fetcher.ApolloResponseFetchers
import com.apollographql.apollo.integration.normalizer.EpisodeHeroNameQuery
Expand All @@ -16,6 +19,7 @@ import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import okhttp3.Dispatcher
Expand Down Expand Up @@ -191,6 +195,37 @@ class CoroutinesApolloTest {
assertThat(channel.isClosedForReceive).isEqualTo(true)
}

@Test
fun flowCanBeRead() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()

runBlocking {
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
flow.toList(result)
assertThat(result.size).isEqualTo(1)
assertThat(result[0].data()?.hero()?.name()).isEqualTo("R2-D2")
}
}

@Test
fun flowError() {
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))

val flow = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toFlow()

runBlocking {
val result = mutableListOf<Response<EpisodeHeroNameQuery.Data>>()
try {
flow.toList(result)
} catch (e: ApolloException) {
return@runBlocking
}

throw Exception("exception has not been thrown")
}
}

companion object {

Expand Down
4 changes: 2 additions & 2 deletions gradle/dependencies.gradle
Expand Up @@ -10,8 +10,8 @@ def versions = [
javaPoet : '1.9.0',
jetbrainsAnnotations : '13.0',
junit : '4.12',
kotlin : '1.3.30',
kotlinCoroutines : '1.3.0',
kotlin : '1.3.50',
kotlinCoroutines : '1.3.1',
kotlinPoet : '1.3.0',
mockito : '1.9.5',
moshi : '1.8.0',
Expand Down

0 comments on commit ab63da7

Please sign in to comment.