Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutine based state store #84

Closed
wants to merge 9 commits into from

Conversation

ATizik
Copy link

@ATizik ATizik commented Sep 7, 2018

Left most of queues work intact, aggregated it inside an actor. Switched state subject to channel. Removed flush subject, now it's a responsibility of the actor. Added a way to synchronously get a value from actor(meaning that it's passed as message to and actor just like set and get messages, and is processed in same iterator)

Tests pass, but with artificial delays because I couldn't at this devise a method to make actor work synchronously for tests, I'll fix it later.

Extracted stateStore interface for tests.
Added concurrency test. Be advised that original MvRxStateStore rarely fails new concurrency test, that's why I left 1000 identical tests in a row to demonstrate.

Didn't find contributing guide, so I'll await feedback.
TODO for me: fix tests and document new code

UPDATE:
Fixed tests to run synchronously like before

UPDATE_2:
Simplified flushing logic. Now it iterates over every element of an actor's channel, placing every GetQueueElement to it's own queue and processing every SetQueueElement right away. Once channel is empty getQueue is processed

UPDATE_3:
Transition to coroutines 0.26.0 with structured concurrency changes

UPDATE_4:
Updated to Kotlin 1.3.0 and Coroutines 1.0.0

@gpeal
Copy link
Collaborator

gpeal commented Sep 21, 2018

Thank you @ATizik we're going to dig in and take a close look at this ❤️

mvrx/build.gradle Outdated Show resolved Hide resolved
*
*/

internal open class MvCorStateStore<S : Any>(initialState: S, coroutineDispatcher: CoroutineDispatcher = DefaultDispatcher) : Disposable, IMvRxStateStore<S> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think if we change DefaultDispatcher to newSingleThreadContext ? I'm not sure how the actor will work on Dispatcher with multiple threads

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is correct, it doesn't matter for actors, since actor is a coroutine and is executed sequentially. Here is relevant documentation. It might make sense to create a separate cached pool though, because Default can be starved by other tasks

@ATizik ATizik changed the title WIP Coroutine based state store Coroutine based state store Sep 24, 2018
@gpeal
Copy link
Collaborator

gpeal commented Oct 31, 2018

@ATizik Do you want to update this to stable coroutines? Now that they are stable, it's time to take a serious look at this.

@BenSchwab @elihart @hellohuanlin

@ATizik
Copy link
Author

ATizik commented Oct 31, 2018

@gpeal
Yep, I'll update MR in a few days

Alexander Tizik and others added 4 commits November 6, 2018 17:58
# Conflicts:
#	mvrx/src/main/kotlin/com/airbnb/mvrx/MvRxStateStore.kt
#	mvrx/src/test/kotlin/com/airbnb/mvrx/StateStoreTest.kt
@ATizik
Copy link
Author

ATizik commented Nov 6, 2018

Updated to Kotlin 1.3.0 and Coroutines 1.0.0

You might want to wait a bit more if you want to rely on stable api, because ConflatedBroadcastChannel is marked as @ExperimentalCoroutinesApi,
actor builder is marked as @ObsoleteCoroutinesApi, due to be replaced by complex actors:
Kotlin/kotlinx.coroutines#87

@Hesamedin
Copy link

Is there any update on this PR?
I am using Coroutines in my project and having this PR is great.
Thanks to all of you who made this possible 👍

@gpeal
Copy link
Collaborator

gpeal commented Apr 8, 2019

@Hesamedin this PR doesn't affect the external API of MvRx, just the internal implementation.

@Hesamedin
Copy link

Hesamedin commented Apr 9, 2019

Oh, I thought this is the fix for #130 :(
I am using Coroutines in my project for handling Network calls and I am wondering am I able to use Async the way you are using for Rx.execute()?

Anyways, thanks guys for building such an awesome library. I am using it and it and I love it 🥰 👍

@Dwite
Copy link

Dwite commented Apr 9, 2019

@telus you can use wrapper around Deferred or direct results from suspend function that will map your network calls to Async

@gpeal
Copy link
Collaborator

gpeal commented Apr 10, 2019

@Dwite @Hesamedin execute is just a convenience around RxJava and you could create your own extension function for coroutines. However, we may ship a separate mvrx-coroutines artifact in the future to make it work out of the box (tbd)
Something like this (untested code) should work:

fun <T : Any> CoroutineScope.execute(fn: suspend () -> T, dispatcher: Dispatcher = Dispatchers.Default, reducer: S.(Async<T>) -> S) {
    launch {
        setState { reducer(Loading()) }
       try {
           setState { reducer(Success(fn())) }
        } catch (e: Exception) {
            setState { reducer(Fail(e)) }
    }
}

@@ -12,6 +13,7 @@ buildscript {

repositories {
google()
mavenCentral()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that jcenter is a supercet of mavenCentral do you need both?

@@ -51,10 +60,19 @@ jacoco {
toolVersion = "0.8.2"
}

allprojects {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed, allprojects repos are being added in the root build.gradle

@@ -5,6 +5,15 @@ apply plugin: 'kotlin-kapt'
apply from: 'gradle-maven-push.gradle'
apply plugin: "jacoco"

buildscript {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add build script repositories at the module level?

@@ -1,5 +1,6 @@
buildscript {
ext.kotlinVersion = '1.2.41'
ext.kotlinVersion = '1.3.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please upgrade to latest 1.3.40

*
*/

internal open class MvCorStateStore<S : Any>(initialState: S, final override val coroutineContext: CoroutineContext = Dispatchers.Default) : Disposable, CoroutineScope, MvRxStateStore<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having the class implement CoroutineScope could you have it own a coroutine scope as a private property. Android did a similar thing with fragment.lifecycleScope. Think of it as not repeating the problem of an activity being a context but rather an activity owning a context.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for convenince since every coroutine expects to be started in CoroutineScope, and this class should be passed around by it's interface so it doesn't expose it's scope. Also in kotlin documentation activity implements coroutine scope through delegate. If you want I'll refactor to private property, I don't have a strong opinion on which way is the right one

* The observable observes the stateChannel but only emits events when the state actually changed.
*/

override val observable: Observable<S> = stateChannel.openSubscription().asObservable(coroutineContext).distinctUntilChanged()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if flipping to a coroutine store, would it make sense to go all the way and change observable to flow? cc @gpeal & @BenSchwab

* Once every SetQueueElement is processed actor iterates over GetQueueElements until no elements are left, or a new message
* is sent to an actor
*/
private val actor = actor<Job<S>>(coroutineContext, capacity = Channel.UNLIMITED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not perfectly primed on actors yet, what is the advantage of using an actor rather than a straight channel or flow here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically a shorhand for this:

private val queueChannel = Channel<Job<S>>(capacity = Channel.UNLIMITED)
private val actor = launch {
   queueChannel.consumeEach {

From documentation:
An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class.

There is an actor coroutine builder that conveniently combines actor's mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.

@elihart elihart mentioned this pull request Sep 25, 2019
@haroldadmin
Copy link
Contributor

haroldadmin commented Dec 4, 2019

I think the job scheduling process used here could be greatly simplified using the select expression from the Kotlin Coroutines library.

The idea is to always process setState actions before getState actions, even when there are nested actions. A simple implementation could look like this:

typealias Reducer = S.() -> S
typealias Action = (S) -> Unit

val setStateJobs = Channel<Reducer<S>>(Channel.UNLIMITED)
val getStateJobs = Channel<Action<S>>(Channel.UNLIMITED)

coroutineScope.launch {
  while (isActive) {
    select<Unit> {
      setStateJobs.onReceive { reducer -> 
        val newState = reducer.invoke(currentState)
        updateState(newState)
      }
      getStateJobs.onReceive { action -> 
        action.invoke(currentState) 
      } 
    }
  }
}

The select expression lets us wait on multiple channels together. These channels are specified using different 'cases' in the select expression. It suspends while no items can be received in any of its cases, and resumes to execute the first received item. Therefore, it is safe and efficient to run it in an while loop scoped to the lifetime of the enclosing CoroutineScope.

The select expression is biased towards the first case (setStateJobs.onReceiver { ... }), and therefore it always processes the reducers before actions in this implementation.

Here is the complete implementation in my own library, and here are the tests for it.

Your thoughts, @gpeal @digitalbuddha @elihart @Dwite @ATizik ?

/**
* The stateChannel is where state changes should be pushed to.
*/
private val stateChannel = ConflatedBroadcastChannel(initialState)
Copy link

@rougsig rougsig Apr 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConflatedBroadcastChannel can drop events. It it expected behavior for MvCorStateStore?

  @Test
  fun test_ConflatedBroadcastChannel_runBlocking() {
    runBlocking {
      val channel = ConflatedBroadcastChannel<Int>()     
      async {
        channel.asFlow().collect {
          println("collected $it")
        }
      }
      async {
        (0 until 5).forEach {
          println("send $it")
          channel.send(it)
        }
        channel.cancel()
      }      
    }    
    // console out
    // send 0
    // send 1
    // send 2
    // send 3
    // send 4
    // collected 0
    // collected 4
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rougsig This PR has been superceded by #347

@gpeal
Copy link
Collaborator

gpeal commented May 16, 2020

@ATizik Thank you for the work you did to get MvRx started on a coroutines path. We have begin the work on 2.0 on the release/2.0.0 branch. This included #347 which makes this obsolete but thanks again for the time you put in!

@gpeal gpeal closed this May 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants