-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cancel onAction when leaving the current state #175
Conversation
I broke something during the rebase, will look into it. |
It's fixed now the test failures are the already failing onEnter and CustomIsInStateDslTest tests |
isInState: (S) -> Boolean, | ||
getState: GetState<S>, | ||
transform: (Flow<Action<S, A>>) -> Flow<Action<S, A>> | ||
) = channelFlow { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a ChannelFlow here?. wouldn't it work with just a regular Flow as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that we emit/send from within the launch
. Just using flow { coroutineScope { ... } }
violates the context preservation requirement of flow and crashes, you can see that in the commit from before the force push
getState: GetState<S>, | ||
transform: (Flow<Action<S, A>>) -> Flow<Action<S, A>> | ||
) = channelFlow { | ||
var currentChannel: Channel<Action<S, A>>? = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a mutex to ensure no 2 actions are processed in parallel or is there auch a guarantee already in place under the hood by channelFlow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flow itself is sequential and under the assumption that collectors are not thread safe. So when we collect the upstream here we only get the next action when the previous one was handled which happens after we send
it into currentChannel
. So we sequentially put them into the channel and it also applies to creating/closing the channel. For the processing itself we are collecting the channel as a flow again, so it's als sequential and ordering of the action processing then depends on the transform. For onAction specifically it depends on the flatMapPolicy
, CONCAT and LATEST are not parallel, while MERGE is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM and your implementation makes sense to me. Left 2 questions, but no blocker ...
Closes #128
This one is more complex then fixing onEnter. Just the boolean that we have there because we still need the action value and new actions while we are in the state should not cancel the previous flow. We basically want a
flatMapSometimesLatest
. The new operator will put new upstream actions into aChannel
whenisInState
is true. ThisChannel
is then turned into a Flow and we apply our filtering and transformation with the flatMapPolicy on it. The resultingFlow
gets collected and it's emissions are emitted to the downstream. WhenisInState
becomes false the channel is closed so that the resulting flow will get cancelled as well.