Skip to content
Permalink
Browse files

Use ConflatedBroadcastChannel rather than Subjects

  • Loading branch information...
chrisbanes committed Aug 6, 2019
1 parent cce8d77 commit d31ae539a06c8afc545baa96dbd7ed9bf650f12a
@@ -27,7 +27,8 @@ import com.airbnb.mvrx.MvRxViewModelFactory
import com.airbnb.mvrx.ViewModelContext
import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import hu.akarnokd.kotlin.flow.PublishSubject
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.launch
@@ -37,11 +38,12 @@ class SearchViewModel @AssistedInject constructor(
private val searchShows: SearchShows,
tmdbManager: TmdbManager
) : TiviMvRxViewModel<SearchViewState>(initialState) {
private val searchQuery = PublishSubject<String>()
private val searchQuery = ConflatedBroadcastChannel<String>()

init {
viewModelScope.launch {
searchQuery.debounce(300)
searchQuery.asFlow()
.debounce(300)
.collect {
viewModelScope.launchInteractor(searchShows, SearchShows.Params(it))
}
@@ -59,7 +61,7 @@ class SearchViewModel @AssistedInject constructor(

fun setSearchQuery(query: String) {
viewModelScope.launch {
searchQuery.emit(query)
searchQuery.send(query)
}
}

@@ -23,7 +23,6 @@ dependencies {
api Libs.Kotlin.stdlib

api Libs.Coroutines.core
api Libs.kotlinFlowExtensions

api Libs.Dagger.dagger

@@ -16,9 +16,10 @@

package app.tivi.util

import hu.akarnokd.kotlin.flow.BehaviorSubject
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import javax.inject.Inject
@@ -27,20 +28,20 @@ class ObservableLoadingCounter @Inject constructor(
val dispatchers: AppCoroutineDispatchers
) {
private var loaders = 0
private val loadingState = BehaviorSubject(loaders)
private val loadingState = ConflatedBroadcastChannel(loaders)

val observable: Flow<Boolean>
get() = loadingState.map { it > 0 }
get() = loadingState.asFlow().map { it > 0 }

fun addLoader() {
GlobalScope.launch(dispatchers.main) {
loadingState.emit(++loaders)
loadingState.send(++loaders)
}
}

fun removeLoader() {
GlobalScope.launch(dispatchers.main) {
loadingState.emit(--loaders)
loadingState.send(--loaders)
}
}
}
@@ -67,8 +67,6 @@ object Libs {
const val android = "org.jetbrains.kotlinx:kotlinx-coroutines-android:$version"
}

const val kotlinFlowExtensions = "com.github.akarnokd:kotlin-flow-extensions:0.0.2"

object AndroidX {
const val appcompat = "androidx.appcompat:appcompat:1.1.0-beta01"
const val browser = "androidx.browser:browser:1.0.0"
@@ -25,7 +25,8 @@ import app.tivi.data.Entry
import app.tivi.data.resultentities.EntryWithShow
import app.tivi.interactors.PagingInteractor
import app.tivi.tmdb.TmdbManager
import hu.akarnokd.kotlin.flow.BehaviorSubject
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.combineLatest
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.launch
@@ -37,8 +38,8 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
private val logger: Logger,
private val pageSize: Int = 21
) : ViewModel() {
private val messages = BehaviorSubject<UiResource>()
private val loaded = BehaviorSubject(false)
private val messages = ConflatedBroadcastChannel<UiResource>()
private val loaded = ConflatedBroadcastChannel(false)

protected val pageListConfig = PagedList.Config.Builder().run {
setPageSize(pageSize * 3)
@@ -52,21 +53,21 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract

override fun onItemAtFrontLoaded(itemAtFront: LI) {
viewModelScope.launch {
loaded.emit(true)
loaded.offer(true)
}
}

override fun onZeroItemsLoaded() {
viewModelScope.launch {
loaded.emit(true)
loaded.offer(true)
}
}
}

val viewState = messages.combineLatest(
val viewState = messages.asFlow().combineLatest(
tmdbManager.imageProviderFlow,
pagingInteractor.observe().flowOn(pagingInteractor.dispatcher),
loaded
loaded.asFlow()
) { message, imageProvider, pagedList, loaded ->
EntryViewState(message, imageProvider, pagedList, loaded)
}
@@ -116,5 +117,5 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
}
}

private suspend fun sendMessage(uiResource: UiResource) = messages.emit(uiResource)
private suspend fun sendMessage(uiResource: UiResource) = messages.offer(uiResource)
}
@@ -17,11 +17,12 @@
package app.tivi.interactors

import androidx.paging.PagedList
import hu.akarnokd.kotlin.flow.BehaviorSubject
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.switchMap
import kotlinx.coroutines.launch
@@ -44,20 +45,23 @@ abstract class PagingInteractor<P : PagingInteractor.Parameters<T>, T> : Subject
}

abstract class SuspendingWorkInteractor<P : Any, T> : ObservableInteractor<P, T> {
private val subject = BehaviorSubject<T>()
private val subject = ConflatedBroadcastChannel<T>()
private val flow = subject.asFlow()

override suspend operator fun invoke(params: P) = subject.emit(doWork(params))
override suspend operator fun invoke(params: P) = subject.send(doWork(params))

abstract suspend fun doWork(params: P): T

override fun observe(): Flow<T> = subject
override fun observe(): Flow<T> = flow
}

abstract class SubjectInteractor<P : Any, T> : ObservableInteractor<P, T> {
private val subject = BehaviorSubject<P>()
private val flow = subject.distinctUntilChanged().switchMap { createObservable(it) }
private val subject = ConflatedBroadcastChannel<P>()
private val flow = subject.asFlow()
.distinctUntilChanged()
.switchMap { createObservable(it) }

override suspend operator fun invoke(params: P) = subject.emit(params)
override suspend operator fun invoke(params: P) = subject.send(params)

protected abstract fun createObservable(params: P): Flow<T>

@@ -20,9 +20,10 @@ import app.tivi.extensions.fetchBodyWithRetry
import app.tivi.util.AppCoroutineDispatchers
import com.uwetrottmann.tmdb2.Tmdb
import com.uwetrottmann.tmdb2.entities.Configuration
import hu.akarnokd.kotlin.flow.BehaviorSubject
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import javax.inject.Inject
@@ -33,9 +34,8 @@ class TmdbManager @Inject constructor(
private val dispatchers: AppCoroutineDispatchers,
private val tmdbClient: Tmdb
) {
private val imageProviderSubject = BehaviorSubject(TmdbImageUrlProvider())
val imageProviderFlow: Flow<TmdbImageUrlProvider>
get() = imageProviderSubject
private val imageProviderSubject = ConflatedBroadcastChannel(TmdbImageUrlProvider())
val imageProviderFlow: Flow<TmdbImageUrlProvider> = imageProviderSubject.asFlow()

init {
refreshConfiguration()
@@ -61,7 +61,7 @@ class TmdbManager @Inject constructor(
images.secure_base_url!!,
images.poster_sizes ?: emptyList(),
images.backdrop_sizes ?: emptyList())
imageProviderSubject.emit(newProvider)
imageProviderSubject.send(newProvider)
}
}
}
@@ -24,9 +24,10 @@ import app.tivi.util.AppCoroutineDispatchers
import app.tivi.util.Logger
import com.uwetrottmann.trakt5.TraktV2
import dagger.Lazy
import hu.akarnokd.kotlin.flow.BehaviorSubject
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
@@ -53,16 +54,15 @@ class TraktManager @Inject constructor(
private val logger: Logger,
private val traktClient: Lazy<TraktV2>
) {
private val authState = BehaviorSubject<AuthState>()
private val authState = ConflatedBroadcastChannel<AuthState>()

private val _state = BehaviorSubject(TraktAuthState.LOGGED_OUT)
val state: Flow<TraktAuthState>
get() = _state
private val _state = ConflatedBroadcastChannel(TraktAuthState.LOGGED_OUT)
val state: Flow<TraktAuthState> = _state.asFlow()

init {
// Observer which updates local state
GlobalScope.launch(dispatchers.main) {
authState.collect { authState ->
authState.asFlow().collect { authState ->
updateAuthState(authState)

traktClient.get().apply {
@@ -77,15 +77,15 @@ class TraktManager @Inject constructor(
val state = withContext(dispatchers.io) {
readAuthState()
}
authState.emit(state)
authState.send(state)
}
}

private suspend fun updateAuthState(authState: AuthState) {
if (authState.isAuthorized) {
_state.emit(TraktAuthState.LOGGED_IN)
_state.send(TraktAuthState.LOGGED_IN)
} else {
_state.emit(TraktAuthState.LOGGED_OUT)
_state.send(TraktAuthState.LOGGED_OUT)
}
}

@@ -112,7 +112,7 @@ class TraktManager @Inject constructor(
val newState = AuthState().apply { update(response, ex) }
GlobalScope.launch(dispatchers.main) {
// Update our local state
authState.emit(newState)
authState.send(newState)
}
GlobalScope.launch(dispatchers.io) {
// Persist auth state

0 comments on commit d31ae53

Please sign in to comment.
You can’t perform that action at this time.