Skip to content

Commit

Permalink
dispatch is synchronous now
Browse files Browse the repository at this point in the history
in thunks and initializers calling dispatch now synchronously triggers the reducers
the reducers are still added to the reducer queue but the thunk/initializer executing is suspended till the reducer has finished
  • Loading branch information
1gravity committed Jun 27, 2023
1 parent fb0da07 commit da3c06d
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

private const val QUEUE_INITIAL_SIZE = 10

Expand Down Expand Up @@ -83,7 +86,7 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :
state = blocState,
dispatcher = thunkDispatcher,
thunks = thunks,
dispatch = reduceProcessor::send,
dispatch = { reduce(it) },
reduce = reducer
)

Expand All @@ -92,10 +95,20 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :
state = blocState,
dispatcher = initDispatcher,
initializer = initialize,
dispatch = { thunkProcessor.send(it) },
dispatch = {
val processed = thunkProcessor.send(it)
if (processed.not()) reduce(it)
},
reduce = reducer
)

/**
* Reduces the given action by suspending the coroutine until the action has been processed.
*/
private suspend fun reduce(action: Action) = suspendCoroutine { continuation ->
reduceProcessor.send(action) { continuation.resume(Unit) }
}

init {
blocLifecycle.subscribe(onStart = {
// process actions, thunks and reducers
Expand Down Expand Up @@ -130,7 +143,15 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :

// thunks are always processed first
// ThunkProcessor will send the action to ReduceProcessor if there's no matching thunk
blocLifecycle.isStarted() -> thunkProcessor.send(action)
blocLifecycle.isStarted() -> {
val processed = thunkProcessor.send(action)
if (processed.not()) {
// reducers run synchronously -> usinh runBlocking here is OK
runBlocking {
reduce(action)
}
}
}

else -> { /* NOP*/ }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class InitializeProcessor<State : Any, Action : Any, Proposal : Any>(
private val state: BlocState<State, Proposal>,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
private var initializer: Initializer<State, Action, Proposal>? = null,
private val dispatch: (Action) -> Unit,
private val dispatch: suspend (Action) -> Unit,
private val reduce: (proposal: Proposal) -> Unit
) {

Expand Down Expand Up @@ -68,7 +68,7 @@ internal class InitializeProcessor<State : Any, Action : Any, Proposal : Any>(
coroutineHelper.launch {
if (mutex.tryLock(this@InitializeProcessor)) {
val context = InitializerContext(
state = state.value,
getState = { state.value },
dispatch = dispatch,
reduce = reduce,
launchBlock = coroutineHelper::launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop

coroutineHelper.launch {
for (element in reduceChannel) {
element.action?.run(::runReducers)
element.reducer?.run(::runReducer)
element.action?.let(::runReducers)
element.reducer?.let(::runReducer)
}
}
}
Expand All @@ -83,9 +83,9 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
* BlocDSL:
* reduce { } -> run a Reducer Redux style
*/
internal fun send(action: Action) {
internal fun send(action: Action, onFinished: () -> Unit) {
logger.d("received reducer with action ${action.trimOutput()}")
reduceChannel.trySend(ReducerContainer(action))
reduceChannel.trySend(ReducerContainer(action = action to onFinished))
}

/**
Expand All @@ -100,7 +100,8 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
/**
* Triggered to execute reducers with a matching Action
*/
private fun runReducers(action: Action) {
private fun runReducers(actionOnFinished: Pair<Action, () -> Unit>) {
val (action, onFinished) = actionOnFinished
logger.d("run reducers for action ${action.trimOutput()}")
getMatchingReducers(action).fold(false) { proposalEmitted, matcherReducer ->
val (_, reducer, expectsProposal) = matcherReducer
Expand All @@ -122,6 +123,7 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
else -> proposalEmitted
}
}
onFinished()
}

private fun getMatchingReducers(action: Action) = reducers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import com.onegravity.bloc.utils.ReducerNoAction
* Wrapper class for reducers that are submitted Redux style (send(Action)) or MVVM+ style (reduce { })
*/
internal data class ReducerContainer<State : Any, Action : Any, SideEffect : Any, Proposal : Any>(
val action: Action? = null,
val action: Pair<Action, () -> Unit>? = null,
val reducer: ReducerNoAction<State, Effect<Proposal, SideEffect>>? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ internal class ThunkProcessor<State : Any, Action : Any, Proposal : Any>(
private val state: BlocState<State, Proposal>,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
private val thunks: List<MatcherThunk<State, Action, Action, Proposal>> = emptyList(),
private val dispatch: (action: Action) -> Unit,
private val reduce: (proposal: Proposal) -> Unit
private val dispatch: suspend (action: Action) -> Unit,
private val reduce: suspend (proposal: Proposal) -> Unit
) {

/**
Expand Down Expand Up @@ -65,13 +65,13 @@ internal class ThunkProcessor<State : Any, Action : Any, Proposal : Any>(
* BlocDSL:
* thunk { } -> run a thunk Redux style
*/
internal fun send(action: Action) {
internal fun send(action: Action): Boolean {
logger.d("received thunk with action ${action.trimOutput()}")
if (thunks.any { it.matcher == null || it.matcher.matches(action) }) {
thunkChannel.trySend(action)
} else {
dispatch(action)
return true
}
return false
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class BlocBuilder<State : Any, Action : Any, SE : Any, Proposal : Any> {
public fun onCreate(initialize: Initializer<State, Action, Proposal>) {
when (_initialize) {
null -> _initialize = initialize
else -> logger.w("Initializer already defined -> ignoring this one")
else -> error("Initializer already defined, there can be only one!")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ package com.onegravity.bloc.utils
* ```
*/
public data class InitializerContext<State, Action, Proposal>(
val state: State,
val getState: GetState<State>,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand All @@ -32,7 +32,7 @@ public data class ThunkContext<State, Action, A : Action, Proposal>(
val getState: GetState<State>,
val action: A,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand All @@ -46,7 +46,7 @@ public data class ThunkContext<State, Action, A : Action, Proposal>(
public data class ThunkContextNoAction<State, Action, Proposal>(
val getState: GetState<State>,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.test.assertEquals
class BlocInitializerExecutionTests : BaseTestClass() {

@Test
fun testWithoutInitializer() = runTests {
fun `test without initializer`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

Expand All @@ -30,39 +30,78 @@ class BlocInitializerExecutionTests : BaseTestClass() {
assertEquals(1, bloc.value)

lifecycleRegistry.onStart()
delay(150)
assertEquals(1, bloc.value)

lifecycleRegistry.onStop()
lifecycleRegistry.onDestroy()
}

/**
* Test regular initializer
* Test regular initializer with dispatch to reducer
*/
@Test
fun testInitializerExecutionReduxStyle() = runTests {
fun `test initializer with dispatch to reducer`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

val bloc = bloc<Int, Action, Unit>(context, 1) {
reduce<Increment> { state + 1 }
onCreate { dispatch(Increment) }
onCreate {
dispatch(Increment)
assertEquals(2, getState())
}
reduce<Decrement> { state - 1 }
reduce { state + 5 }
onCreate { dispatch(Decrement) }
}

assertEquals(1, bloc.value)

lifecycleRegistry.onCreate()
delay(50)
testState(bloc, null, 2)
testState(bloc, Increment, 2)

lifecycleRegistry.onStart()
delay(50)
testState(bloc, null, 2)
testState(bloc, Increment, 3)

lifecycleRegistry.onStop()
lifecycleRegistry.onDestroy()
}

/**
* Test regular initializer that triggers a thunk
*/
@Test
fun `test initializer with dispatch to thunk`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

val bloc = bloc<Int, Action, Unit>(context, 1) {
reduce<Increment> { state + 1 }
onCreate {
// this dispatch is caught by the thunk -> state is updated asynchronously
// (hence with delay in this case)
dispatch(Increment)
assertEquals(1, getState())
delay(200)
assertEquals(2, getState())
}
thunk<Increment> {
delay(50)
dispatch(Increment)
}
reduce<Decrement> { state - 1 }
reduce { state + 5 }
}

assertEquals(1, bloc.value)

lifecycleRegistry.onCreate()
lifecycleRegistry.onStart()

delay(200)
testState(bloc, null, 2)
testState(bloc, Increment, 3)

lifecycleRegistry.onStop()
lifecycleRegistry.onDestroy()
Expand All @@ -73,14 +112,15 @@ class BlocInitializerExecutionTests : BaseTestClass() {
* which will start processing directly dispatched actions (not by the initializer).
*/
@Test
fun testInitializerExecutionDelayed1() = runTests {
fun `test initializer with dispatch and delay 1`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

val bloc = bloc<Int, Action, Unit>(context, 5) {
onCreate {
delay(1000)
dispatch(Increment)
assertEquals(6, getState())
}
reduce<Increment> { state + 1 }
reduce { state + 5 }
Expand Down Expand Up @@ -112,14 +152,15 @@ class BlocInitializerExecutionTests : BaseTestClass() {
* which will start processing directly dispatched actions (not by the initializer).
*/
@Test
fun testInitializerExecutionDelayed2() = runTests {
fun `test initializer with dispatch and delay 2`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

val bloc = bloc<Int, Action, Unit>(context, 5) {
onCreate {
delay(1000)
dispatch(Increment)
assertEquals(6, getState())
}
reduce<Increment> { state + 1 }
reduce { state + 5 }
Expand Down Expand Up @@ -150,7 +191,7 @@ class BlocInitializerExecutionTests : BaseTestClass() {
* Test whether long running initializers still run before everything else
*/
@Test
fun testInitializerExecutionOrder() = runTests {
fun `test long-running initializer's execution order`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

Expand Down Expand Up @@ -194,7 +235,7 @@ class BlocInitializerExecutionTests : BaseTestClass() {
* now for MVVM+ style reducers and thunks
*/
@Test
fun testInitializerExecutionOrderMVVM() = runTests {
fun `test long-running initializer's execution order with MVVM+`() = runTests {
val lifecycleRegistry = LifecycleRegistry()
val context = BlocContextImpl(lifecycleRegistry)

Expand Down Expand Up @@ -274,7 +315,7 @@ class BlocInitializerExecutionTests : BaseTestClass() {

val bloc = bloc<Int, Int, Unit>(context, 1) {
onCreate {
reduce(state + 2)
reduce(getState() + 2)
}
reduce { state + action }
}
Expand Down
Loading

0 comments on commit da3c06d

Please sign in to comment.