-
Notifications
You must be signed in to change notification settings - Fork 366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INDY-2147: Stashing router + example usage #1264
Conversation
Signed-off-by: Sergey Khoroshavin <sergey.khoroshavin@dsr-corporation.com>
Signed-off-by: Sergey Khoroshavin <sergey.khoroshavin@dsr-corporation.com>
|
||
# TODO: Log stash? | ||
self._queues[result].append((message, *args)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may make sense to create methods returning how many messages are stashed for a type.
This cane be useful for
- (Unit) tests
- Metrics
queue = self._queues[reason] | ||
self._queues[reason] = [] | ||
for msg_tuple in queue: | ||
msg_type = type(msg_tuple[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why msg
is always a tuple? Should we mention it in _process
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because depending on use case message handler can accept arbitrary number of arguments, first of which is always message. This is already reflected in signature of _process
:
def _process(self, handler: Handler, message: Any, *args):
I would also like to reflect this in Handler type annotation, doing something like
Handler = Callable[[Any, ...], Optional[int]]
however this is unfortunately an invalid syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to be merged now, comments can be processed in a separate PR.
@@ -56,49 +146,75 @@ def start_view_change(self): | |||
) | |||
self._network.send(vc) | |||
|
|||
self._stasher.unstash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it needs to unstash messages stashed as a result of View Change only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the only reason for stashing messages in view change service is message containing future view no, so no need for introducing extra parameters. When we actually need more reasons for stashing/unstashing it can be done in the following way:
STASH_CATCHUP = STASH + 0
STASH_FUTURE_VIEW = STASH + 1
def _validate(msg):
...
return STASH_FUTURE_VIEW
...
def start_view_change():
...
self._stasher.unstash(STASH_FUTURE_VIEW)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what about receiving ViewChange messages during Catchup? I think we need to stash it as well, so we have at least 2 cases already...
@staticmethod | ||
def _find_primary(validators: List[str], view_no: int) -> str: | ||
return validators[view_no % len(validators)] | ||
|
||
def _is_primary(self, view_no: int) -> bool: | ||
# TODO: Do we really need this? | ||
return self._find_primary(self._data.validators, view_no) == self._data.name | ||
|
||
def _validate(self, msg: Union[ViewChange, ViewChangeAck, NewView], frm: str) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is rather a matter of taste, but I would move this into a separate class
- to be more consistent with Ordered (Replica) Service
- Be able to used shared validation
- Easier to replace implementation (especially useful during integration phase)
- Break the monolith of services
- Have more explicit place for all validation
- Easier to cover by unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is planned in near future
|
||
return self._quorums.view_change_ack.is_reached(len(self._acks[self._digest])) | ||
|
||
def add_view_change(self, msg: ViewChange) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we going to use the return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to raise suspicion if it returned false. Probably it should return either tuple(ok: bool, err: str) or just string.
No description provided.