Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Select.ready() async iterable #62

Conversation

leandro-lucarella-frequenz
Copy link
Contributor

Make Select.ready() an async iterator

The async iterator yields a set of receivers that are ready to be consumed. Users need to consume() explicitly from the receivers that are ready and are not automatically consumed() by the select object if they were not consumed in the select loop.

Example:

select = Select(recv1, recv2)
async for ready_set in select.ready():
    if recv1 in ready_set:
        msg = recv1.consume()
        # do whatever with msg, consume() can also raise an error as normal
    if recv2 in ready_set:
        msg = recv2.consume()

If a receiver is stopped, then it will be automatically removed from the select loop.

Fixes #47.

@leandro-lucarella-frequenz leandro-lucarella-frequenz added this to the v0.11.0 milestone Dec 30, 2022
@leandro-lucarella-frequenz leandro-lucarella-frequenz added the type:enhancement New feature or enhancement visitble to users label Dec 30, 2022
@github-actions github-actions bot added part:channels Affects channels implementation part:docs Affects the documentation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) labels Dec 30, 2022
@leandro-lucarella-frequenz
Copy link
Contributor Author

This is a new attempt (based on #61, so excuse the duplicated commits), I got it as a draft in my repo from the time I was working on it and cleaned it up a bit while cleaning up the exceptions stuff that I also had in draft.

For me it makes the interface (and implementation) of select much simpler and clear. Maybe we can try it on the new comers to see if it really is.

@leandro-lucarella-frequenz
Copy link
Contributor Author

Also updated. Moved the Don't raise exceptions in Receiver.ready() commit to #61 too, so only the last commit (Make Select.ready() an async iterator) is relevant for this PR (the others should be reviewed in #61).

The async iterator yields a set of receivers that are ready to be
consumed. Users need to consume() explicitly from the receivers that
are ready and are not automatically consumed() by the select object if
they were not consumed in the select loop.

If a receiver is stopped, then it will be automatically removed from the
select loop.

Signed-off-by: Leandro Lucarella <leandro.lucarella@frequenz.com>
@github-actions github-actions bot removed the part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) label Feb 10, 2023
@github-actions github-actions bot removed part:channels Affects channels implementation part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) labels Feb 10, 2023
@leandro-lucarella-frequenz
Copy link
Contributor Author

Rebased on the current head.

Copy link
Contributor

@shsms shsms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Luca!

I think the biggest benefit of this implementation is that we don't end up losing type information for received values, because users are reading them directly from the receiver.

But some other features seem to have gone away, I've mentioned those in individual comments.

@@ -117,82 +80,41 @@ async def stop(self) -> None:
await asyncio.gather(*self._pending, return_exceptions=True)
self._pending = set()

async def ready(self) -> bool:
async def ready(self) -> AsyncIterator[Set[Receiver[Any]]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not really a ready test anymore. Might as well rename it to __aiter__, so that below is possible:

async for ready in Select(recv1, recv2):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I thought of updating the name but I thought it still made sense thinking that it returns a set of ready receivers, so like "get me the ready receivers", but I'm open to rename it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ready doesn't work so well anymore, because it is called only once, where as in the original interface, it was called after iteration.

Also, if we rename, it would then be a proper break from the previous interface, instead of having the same function but not working the same way for some reason.

"""
self._receivers = kwargs
self._receivers: Dict[str, Receiver[Any]] = {
f"0x{id(r):x}": r for r in receivers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to be consistent with our use of id or hash. If we want to use id here, then the iterator should return list, for which the in operator uses id to compare (through eq -> is -> id). If we want to return a set from the iterator, which uses hash in the in operator, we should use hash here too.

My preference is to use id here and return a list, because that is explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, I only used id() here for debugging purposes. I use the key as the task name and also when you print(instance) it gives you the id, not the hash (if str is not customized). I didn't thought about id vs hash at all.

I see your point about consistency and mixing id with hash when using in with the results of the iterator, but using hash will make things harder to debug and in practice there shouldn't be any differences, id and hash should both work to identify an instance uniquely (assuming receivers won't override hash in a way that they expect 2 different Receiver instances produce the same hash on purpose).

I'd rather not return a list in the iterator because a set makes it clear that there are no duplicates. With set is like we are encoding the unique property into the type system. Also returning a list means we need a linear search over the ready_set for the in operator (I know this is only a conceptual annoyance as with small lists, which is probably what we'll have, a linear search should be fast enough anyway).

If you are still concerned about the id/hash inconsistency, I would rather use hash() for the key/task name and sacrifice some debuggability instead of having less ideal API typing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you say we'll sacrifice readability if we use hash?

>>> hash(object())
8759965091979
>>> id(object())
140159441471728

they both look similarly difficult to read, for me. :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not readability, debuggability, because if you do print(object) or print(f"this is my Object: {object}") you will get the id() (if __str__() is not override) and not hash(), but then when looking at the _ready_set keys you'll get the hash.

It is a minor thing, this is why I say I prefer to go with hash() as key/task name if you really think the inconsistency is an issue.

select = Select(recv1, recv2)
async for ready_set in select.ready():
if recv1 in ready_set:
msg = recv1.consume()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users having to call consume directly make me a bit uneasy. I would much rather advertise using await recv.receive() here, because all our receiver implementations guarantee that until a previous value is read, subsequent calls to ready() don't overwrite unread values in the receiver.

That way, normal users only have to keep one thing receive() in mind, or they can start doing val = await anext(recv) from python 3.10.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it make you uneasy? I don't see the difference between using await recv.receive() or recv.consume() from the safety POV, if you got your receiver from the ready_set, it is guaranteed that ready() was called, so there is something to consume. Even more, it is guaranteed that you won't get blocked when reading the next value, which IMHO is a nice property to have, you know the entire loop won't block once receivers are ready. I don't see why one would like to await for something that it is guaranteed to be there.

Am I missing or misunderstanding anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because it is very important to provide a single simple way of doing things, that's the Python way.

You have many months worth of familiarity with this code, but for someone who is new to this, it will look like a mysterious thing, and they'll have to read the documentation to read what consume is, and they'll have to make sense of the design etc, whereas all they wanted was a way to quickly get a select.

it is guaranteed that you won't get blocked when reading the next value

the guarantee remains if we call receive on a ready channel as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because it is very important to provide a single simple way of doing things, that's the Python way.

I agree with this, but I think looping an async iterator is a different thing that awaiting a single message. The whole point of an async iterator is that it does the await for you.

You have many months worth of familiarity with this code, but for someone who is new to this, it will look like a mysterious thing, and they'll have to read the documentation to read what consume is, and they'll have to make sense of the design etc, whereas all they wanted was a way to quickly get a select.

For me it is not that mysterious because one is async (receive() and the other one is sync (consume()), that's what makes it much clear IMHO, if both were sync or async I agree it would be 2 ways to do the same thing.

the guarantee remains if we call receive on a ready channel as well.

Yes, it is just less efficient because you are adding unnecessary context switches.

Also, nobody is preventing users from call await recv.receive() instead, and it will work and do what the user expects, so I don't see a big issue here.

In any case, if this is about documentation or how we present examples, we could explain more explicitly why consume() is recommended (which I think we should recommend anyway) and mention that of course await recv.receive() will work too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is acceptable for you I can improve the examples and docs.

@@ -117,82 +80,41 @@ async def stop(self) -> None:
await asyncio.gather(*self._pending, return_exceptions=True)
self._pending = set()

async def ready(self) -> bool:
async def ready(self) -> AsyncIterator[Set[Receiver[Any]]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By returning Sets from the iterator, we end up losing two features present in the current implementation, that protect users from hard-to-debug data loss:

  1. If a user writes elif instead of if for a condition check, the current implementation ensures that there will be no data loss, but the proposed design doesn't seem to have that guarantee. This could lead to hard-to-detect/hard-to-reproduce bugs, in most of our (relatively slow) streams, because there will be data loss only on those rare cases where multiple receivers become ready in the same Select iteration.

  2. If someone removes an if check, but forgets to remove the corresponding receiver from the Select call (or if they add it to Select and forget to check for it), the current implementation prints warnings, whereas these get silently ignored in the proposed design.

Both these features are essential in my opinion, and they can be addressed by returning a custom type that holds the set/list objects instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand 1. This implementation is heavily focus on preventing data loss. It is the other way around, in the previous implementation if you write a elif then the message will be explicitly dropped for not handled messages, while this implementation will never drop a message unless consume() is called, which can be only be done by the user. Worse case in this implementation, if you use elif one of your receivers could starve if a receiver above it has messages all the time, otherwise eventually the elif will be hit and the receiver.consume() will get the message.

Or maybe you are talking about the current implementation dropping messages working as a mechanism to make the receiver's queue move even in the event of bugs? In this implementation if messages are never processed the queue will fill up and messages will be dropped on the other end. But again, this will only happen if you have a bug AND you have a receiver that is receiving messages on every loop.

About 2., you would have to remove the if check, not remove the receiver from the Select BUT keep the receiver around for this to be a problem, because if you remove the receiver entirely, then you'll get an error about the undefined symbol when trying to use it in Select. It also looks to me an obscure enough case to design the API around preventing that.

For me adding an abstraction on top of the naked receiver just makes the API (and implementation) more complicated to only cover for what IMHO is an obscure usage bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the other way around, in the previous implementation if you write a elif then the message will be explicitly dropped for not handled messages

I don't think this is correct. Why do you think that this is the case? Have you tested this/seen this behaviour somewhere or are you just guessing?

Worse case in this implementation, if you use elif one of your receivers could starve if a receiver above it has messages all the time, otherwise eventually the elif will be hit and the receiver.consume() will get the message.

This sounds very bad for a general purpose Select implementation. I'd say this alone is a good enough reason for introducing a custom response type.

It also looks to me an obscure enough case to design the API around preventing that.

it is not obscure at all, it can happen all the time, especially if this is well used, and this is the reason we have things like exhaustive matching these days, like in the new python match statement.

For example, imagine writing a select with 6 receivers, and one of them is a config update channel and you forget to handle it in the select block. Then when there are config changes, you'll get a notification only when the buffer fills up, and the buffer has a default size of 50.

There can be several similar scenarios. So I think this definitely needs to be handled.

Copy link
Contributor

@shsms shsms Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the other way around, in the previous implementation if you write a elif then the message will be explicitly dropped for not handled messages

This is not accurate. Just tested:

import asyncio

from frequenz.channels import Broadcast
from frequenz.channels.util import Select


async def test():
    chan1 = Broadcast("")
    chan2 = Broadcast("")

    select = Select(c1=chan1.new_receiver(), c2=chan2.new_receiver())

    for ii in range(10):
        await chan1.new_sender().send(ii)
        await chan2.new_sender().send(ii + 10)

    await chan1.close()
    await chan2.close()

    while await select.ready():
        print(f"ready items: {select._ready_count}")
        if val := select.c1:
            print(f"Chan1 says: {val.inner}")
        elif val := select.c2:
            print(f"Chan2 says: {val.inner}")

        if select._ready_count == 0:
            print()


asyncio.run(test())
that produces:
ready items: 2
Chan1 says: 0
ready items: 1
Chan2 says: 10

ready items: 2
Chan1 says: 1
ready items: 1
Chan2 says: 11

ready items: 2
Chan1 says: 2
ready items: 1
Chan2 says: 12

ready items: 2
Chan1 says: 3
ready items: 1
Chan2 says: 13

ready items: 2
Chan1 says: 4
ready items: 1
Chan2 says: 14

ready items: 2
Chan1 says: 5
ready items: 1
Chan2 says: 15

ready items: 2
Chan1 says: 6
ready items: 1
Chan2 says: 16

ready items: 2
Chan1 says: 7
ready items: 1
Chan2 says: 17

ready items: 2
Chan1 says: 8
ready items: 1
Chan2 says: 18

ready items: 2
Chan1 says: 9
ready items: 1
Chan2 says: 19

ready items: 2
Chan1 says: None
ready items: 1
Chan2 says: None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. Why do you think that this is the case? Have you tested this/seen this behaviour somewhere or are you just guessing?

No, I guess my memory is only faulty. We had this exact same conversation a while ago and you also explained that elif was OK but I guess for some reason that didn't stuck with me and I kept the misconception in my mind all the time. It's all in:

You said there that using break was problematic, but now I don't understand why can that be the case, unless you break the loop before processing any messages at all, as the current select will drop messages if no messages were consumed at all in one loop iteration (I still don't understand why is that but that's a different topic), but doing:

    while await select.ready():
        print(f"ready items: {select._ready_count}")
        if val := select.c1:
            print(f"Chan1 says: {val.inner}")
            break
        if val := select.c2:
            print(f"Chan2 says: {val.inner}")

Should be fine too because if the break was reached, it was because at least one receiver message was consumed. I'm not sure why we thought that was a problem in that issue anymore.

What should drop messages is if you forget to handle one case inside the loop, and you only receive a message for that receiver.

OK, so let's forget about this for the current implementation. The point is the current implementation doesn't drop message under any circumstances. :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, imagine writing a select with 6 receivers, and one of them is a config update channel and you forget to handle it in the select block. Then when there are config changes, you'll get a notification only when the buffer fills up, and the buffer has a default size of 50.

OK, so you are not saying the case where you remove some receiver but when you are writing a select loop for the first time and forgot to handle one case. Fair enough then. I agree it would be idea to leverage match here then, if there is a way to let mypy tell you if a case is not being properly handled at "compile time" instead of runtime, that would be awesome.

I'll try to look into this option and if not we keep the runtime check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine too because if the break was reached, it was because at least one receiver message was consumed. I'm not sure why we thought that was a problem in that issue anymore.

Just for the records, this was an issue and it was fixed:

Copy link
Contributor

@shsms shsms Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You said there that using break was problematic, but now I don't understand why can that be the case, unless you break the loop before processing any messages at all,

yes, break was problematic, and we fixed it by splitting receive into ready and consume.

as the current select will drop messages if no messages were consumed at all in one loop iteration (I still don't understand why is that but that's a different topic)

This allows us to log a warning about an unread message as soon as we know it is unread, for each missed message, because that is a clear bug in user code.

Worse case in this implementation, if you use elif one of your receivers could starve if a receiver above it has messages all the time, otherwise eventually the elif will be hit and the receiver.consume() will get the message.

This sounds very bad for a general purpose Select implementation. I'd say this alone is a good enough reason for introducing a custom response type.

This is true for the current implementation too. I agree it would be good to prevent it, but I would leave it for another PR as it is a different enhancement.

no, the current implementation doesn't starve any of the receivers, whether or not elif is used, as demonstrated in the example I shared above. And this feature is going away in this PR.

OK, so you are not saying the case where you remove some receiver but when you are writing a select loop for the first time and forgot to handle one case

yes, I mean there can be we can get into such a situation from either direction, and sometimes it will be a slow but critical thing and we might not notice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to log a warning about an unread message as soon as we know it is unread, for each missed message, because that is a clear bug in user code.

I'm not sure about this. Even when convoluted, you could check some condition before consuming something that is ready and decide to not consume it yet and give the loop another spin. I would rather avoid dropping messages under any circumstance. Logging is OK, as it is likely a bug, and certainly not good programming if it isn't, but dropping is going a bit too far IMHO. If the code has a bug you won't make it better by just dropping messages and continue with your life as if nothing happened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the current implementation doesn't starve any of the receivers, whether or not elif is used, as demonstrated in the example I shared above. And this feature is going away in this PR.

Ah, right, because you don't schedule the receiver that was already consumed to receive again until all other ready receivers were consumed. I see 👍

@leandro-lucarella-frequenz
Copy link
Contributor Author

OK, so this review got a bit deep. Thanks for the thoughtful and in depth review, and bearing with me with all the clarifications, I definitely underestimated some of the complexity that was there and was there for a reason 😬

To summarize these are my take-aways:

Things to change for sure:

  • Rename ready() to __aiter__() [*]
  • Clarify in the examples that msg = await recv.receive() can be used too, but it would be less efficient than msg = recv.consume() and explain a bit better the differences.
  • Favor using hash() as the key for the internal dict/name for the tasks if we keep returning a set (which is probably not the case because of the starvation thing)

As to avoid starvation. I see 2 main paths:

  1. Leave the current mechanism with some wrappers and keeping an internal state of what was consumed and what wasn't.
  2. Revive the one-by-one approach somehow randomize which receiver is returned.

As to avoid missing receiver handlers in the loop, also 2 main alternatives:

  1. Look into match to see if it can help us
  2. Keep using a wrapper to keep track that all possibilities were covered.

What I don't like that much about the approach above, is there is no way to add/remove receivers to the select. I know we said if you need to do this is probably not a use-case for select, but still it makes noise in my mind, select seems to be a very convenient tool for those cases too.

Let me know if there is something missing or wrong.

@shsms
Copy link
Contributor

shsms commented Feb 17, 2023

Yup, I think we are in agreement about the requirements. I have developed a slight preference for the one-by-one approach.

It looks like in any case, we'll need a wrapper, unless there's some pattern matching magic that would help with detecting missing handlers.

@leandro-lucarella-frequenz
Copy link
Contributor Author

Yup, I think we are in agreement about the requirements. I have developed a slight preference for the one-by-one approach.

I was thinking of having the one-by-one approach as the default and the async iterator interface. This will even consume() the message, so we don't even need to recommend consume() or receive() and we also don't run into the problem of what to do if the user didn't consume the message (to drop or not to drop). The only problem with this issue is how to handle exceptions, because the async for loop will raise as a whole if there is no explicit consuming, but I'll think about it.

Then maybe provide an advance interface for getting multiple ready receivers at once, for cases with more advanced usage like the mentioned above. For this method if we can have safety mechanisms in place to avoid the mentioned common bugs, then great, otherwise it should be clearly flagged as unsafe (like async for ready_set in select.unsafe.all_ready).

It looks like in any case, we'll need a wrapper, unless there's some pattern matching magic that would help with detecting missing handlers.

👍

name = task.get_name()
recv = self._receivers[name]
# This will raise if there was an exception in the task
# Colloect or not collect exceptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Colloect?

@leandro-lucarella-frequenz
Copy link
Contributor Author

Closing as the solution we decided on is different to this one, so we can start fresh with a new PR.

@llucax
Copy link
Contributor

llucax commented Jun 8, 2023

Superseded by #114

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:docs Affects the documentation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests resolution:wontfix This will not be worked on type:enhancement New feature or enhancement visitble to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make Select.ready() an async iterable
5 participants