Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add & use close channel method in the registry #225

Conversation

mathias-baumann-frequenz
Copy link
Contributor

Add all required code to automatically close channels when we're done sending
values in the mock_microgrid.

Required for the datasourcing actor benchmark.

  • Add private method to allow closing of channels
  • Update mock_microgrid to close channels when done sending.
  • Update datasourcing actor to close channels.

@github-actions github-actions bot added part:actor Affects an actor ot the actors utilities (decorator, etc.) part:tests Affects the unit, integration and performance (benchmarks) tests labels Feb 24, 2023
@mathias-baumann-frequenz mathias-baumann-frequenz added this to the v0.19.0 milestone Feb 24, 2023
@mathias-baumann-frequenz mathias-baumann-frequenz force-pushed the close_ze_channel branch 2 times, most recently from 1194435 to 4fa20ee Compare March 2, 2023 13:00
@ela-kotulska-frequenz
Copy link
Contributor

ela-kotulska-frequenz commented Mar 2, 2023

Please update release notes. Because this will be change that might influence users.

@github-actions github-actions bot added the part:docs Affects the documentation label Mar 2, 2023
Comment on lines 346 to 379
api_data_receiver = self.comp_data_receivers[comp_id]
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]

def process_msg(data: Any) -> None:
tasks = []
async for data in api_data_receiver:
for extractor, senders in stream_senders:
for sender in senders:
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
asyncio.gather(*tasks)

async for data in api_data_receiver:
process_msg(data)
await sender.send(Sample(data.timestamp, extractor(data)))

await asyncio.gather(
*[
# pylint: disable=protected-access
self._registry._close_channel(r.get_channel_name())
for requests in self._req_streaming_metrics[comp_id].values()
for r in requests
]
)

async def _update_streams(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely simpler but it is changing the semantics and I'm not sue if it is OK.

Before you were sending all samples in the background, and you you are sending them sequentially. So let's say you need to send sample1 and sample2, you are doing effectively:

await send(sample1)
await send(sample2)

So now if for some reasons send(sample1) gets stuck for 60 seconds, sample2 will not be send for that amount of time either, nor you'll receive new messages, so the whole data pipeline will get stuck.

I think the background sending is a good idea and probably shouldn't be removed (although also having that unbound is not a good idea IMHO, because if there is something funky with sending this will create tasks indefinitely), it is just the implementation was too obscure (probably to get it work quickly and easily).

The part of sending the data for one received sample in parallel is easy, as it is bound anyway:

        async for data in api_data_receiver:
            await asyncio.gather(*[
                sender.send(Sample(data.timestamp, extractor(data)))
                for extractor, senders in stream_senders
                for sender in senders])

This at least will do all the send() in parallel, but again, if there is only one send() that gets blocked, then you'll stop processing new data until that is done, which is not probably what we want.

Again, a more resilient implementation IMHO should have a tasks manager that look at the age of pending sending tasks and if they are too old it cancels them, so they are not dangling forever, but implementing this is a not trivial amount of effort, so I wouldn't do it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an actual scenario where it could get stuck though?

As far as I know, the receiver queue will just drop the oldest entry, so it shouldn't actually get stuck.

I just looked a bit closer at the underlying mechanisms. The only reason send() is async is because we do

async with self._chan.recv_cv: # with aquires lock() of Condition
     self._chan.recv_cv.notify_all()

and in the receiver we do

        while len(self._q) == 0:
            if self._chan.closed:
                return False
            async with self._chan.recv_cv: # Aquire lock
                await self._chan.recv_cv.wait() # release lock and wait()

And I am wondering... is that really necessary? Why was a Condition used here and not simply an Event? (Reminder: A Condition is a lock + event)
I can currently not think of any reason why we would need a lock here. All our operations on the queue are atomic (from a task perspective), either we can successfully push/pop or we drop it, there is no waiting for other tasks during that operation.

And while looking at this, I was looking at https://docs.python.org/3/library/asyncio-queue.html as well, this class implements already everything that is re-implemented in Broadcast & friends, only they used an Event as well, instead of a Condition.

So if there isn't a big reason we're using a lock here, then I think we can actually make the whole send() method sync instead of async and remove a whole class of possible problems and certainly complexity.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And as additional thought: Why not just use the asyncio.Queue class?
The comment in my local file says

    """A queue, useful for coordinating producer and consumer coroutines.

which sounds exactly what we're doing. Including max len and everything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, now you did it :P I tried my idea and here is the result: #233

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now if for some reasons send(sample1) gets stuck for 60 seconds, sample2 will not be send for that amount of time either,

@leandro-lucarella-frequenz in that case gather, won't help very much.
As you said we will send sample2, but we won't send another message. If we think this is possible, then we should wait with the timeout. (using asyncio.wait)
I agree that background sending is better, but from the other side: Simple await is 2.5 times faster then asyncio.create_task so maybe it is worth? We create HUGE number of this tasks in 1 sec.... (battery is sending data every 0.2 sec and we need task for each metric)
I guess asyncio.create_task is useful for a little heavier operations, then putting value on the queue.

I think we can actually make the whole send() method sync

Matthias nice catch. But I would wait with decision for Sahas. He might had reasons for doing it like that.

And as additional thought: Why not just use the asyncio.Queue class?

I think (but I am not sure ) that we made some tests in past and it turns out asyncio.Queue are very slow... From the other side if it will allow send to be sync, then maybe it will improve performance.
I won't find this performance test, @leandro-lucarella-frequenz do you remember where we store them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an actual scenario where it could get stuck though?

As far as I know, the receiver queue will just drop the oldest entry, so it shouldn't actually get stuck.

Yes, the thing is when you await, then you lose the CPU, so even if the send doesn't block, the loop might schedule another task that gets a long time to process.

So if there isn't a big reason we're using a lock here, then I think we can actually make the whole send() method sync instead of async and remove a whole class of possible problems and certainly complexity.

Yes, I think this would be the correct approach. I think we should have a distinction between sync and async sendersin the channels (I don't think we have real async senders for now, but there is an issue about creating a blocking channel, where send() will wait if the queue is full) if we want to give guarantees about some sender not blocking, otherwise you can't really control if the loop will give you back the CPU or not.

I think (but I am not sure ) that we made some tests in past and it turns out asyncio.Queue are very slow... From the other side if it will allow send to be sync, then maybe it will improve performance.
I won't find this performance test, @leandro-lucarella-frequenz do you remember where we store them?

Nope, I don't remember any performance tests with queue.

But I think we all agree we'd like to make send() sync where possible, so I think we can close this discussion with the following tasks list:

  • Create an issue in the channels repo about having sync and async senders (and make all current senders that can be sync, sync): Implement SyncSender and AsyncSender frequenz-channels-python#76
  • Leave this PR implementation as is, as it should work with the current implementation (under certain parameters, like yielding to send() won't starve the current task for too long) and future changes
  • Add a comment explaining the current situation (with a link to the channels issue)

Agreed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And what about the idea i showcased in frequenz-floss/frequenz-channels-python#75
And
#233

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I wouldn't block this PR until something is fixed in the channels repo, let's go with the assumption that at some point this send will be sync. I already created the issue in the channels repo.

It would be interesting if you can add timing info here and in the channels issue (frequenz-floss/frequenz-channels-python#76) so we can have a better understanding about how much faster things could be if send() is async.

tests/utils/mock_microgrid.py Outdated Show resolved Hide resolved
src/frequenz/sdk/actor/_channel_registry.py Outdated Show resolved Hide resolved
Comment on lines 346 to 379
api_data_receiver = self.comp_data_receivers[comp_id]
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]

def process_msg(data: Any) -> None:
tasks = []
async for data in api_data_receiver:
for extractor, senders in stream_senders:
for sender in senders:
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
asyncio.gather(*tasks)

async for data in api_data_receiver:
process_msg(data)
await sender.send(Sample(data.timestamp, extractor(data)))

await asyncio.gather(
*[
# pylint: disable=protected-access
self._registry._close_channel(r.get_channel_name())
for requests in self._req_streaming_metrics[comp_id].values()
for r in requests
]
)

async def _update_streams(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now if for some reasons send(sample1) gets stuck for 60 seconds, sample2 will not be send for that amount of time either,

@leandro-lucarella-frequenz in that case gather, won't help very much.
As you said we will send sample2, but we won't send another message. If we think this is possible, then we should wait with the timeout. (using asyncio.wait)
I agree that background sending is better, but from the other side: Simple await is 2.5 times faster then asyncio.create_task so maybe it is worth? We create HUGE number of this tasks in 1 sec.... (battery is sending data every 0.2 sec and we need task for each metric)
I guess asyncio.create_task is useful for a little heavier operations, then putting value on the queue.

I think we can actually make the whole send() method sync

Matthias nice catch. But I would wait with decision for Sahas. He might had reasons for doing it like that.

And as additional thought: Why not just use the asyncio.Queue class?

I think (but I am not sure ) that we made some tests in past and it turns out asyncio.Queue are very slow... From the other side if it will allow send to be sync, then maybe it will improve performance.
I won't find this performance test, @leandro-lucarella-frequenz do you remember where we store them?

@mathias-baumann-frequenz
Copy link
Contributor Author

I updated the branch with the recent feedback but also removed the controversial commit regarding send() in the data sourcing actor and went back to counting the pending tasks before exiting method.

We can still update it later once we decided how exactly

Comment on lines +357 to +368
nonlocal pending_messages
pending_messages -= 1
if pending_messages == 0:
senders_done.set()

async for data in api_data_receiver:
pending_messages += 1
senders_done.clear()
process_msg(data)

while pending_messages > 0:
await senders_done.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't argue but to remove all this complexity we could do:

 async for data in api_data_receiver:
       await process_msg(data)

This is also much save because:

  • we will send data in the order (everywhere in the code we assume it is correct order)
  • api_data_receiver has maxsize = 50. If we exceed this number, then we start to see logs that messages get lost and we are not processing them in time.
  • In current solution we won't see any logs, we will just produce infinite number of async task, that would block everything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean we await the asyncio.gather(*tasks) call? In that case I don't need all the complexity here.. But didn't we want to wait for sahas before making such changes? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to keep the impact on the code behavior low because of that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I think whatever we do now is likely to change if we go for a sync send(). We are just speculating now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this is good to go, we can update the sending logic after we figured out the sync send(). But @ela-kotulska-frequenz do please raise your voice if you don't agree :)

Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
When the senders closed the channels, the datasourcing actor will also close its sending channels.

Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
@mathias-baumann-frequenz
Copy link
Contributor Author

Rebased on latest changes.

@mathias-baumann-frequenz mathias-baumann-frequenz merged commit 6b7f178 into frequenz-floss:v0.x.x Mar 9, 2023
@mathias-baumann-frequenz mathias-baumann-frequenz deleted the close_ze_channel branch March 9, 2023 13:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:actor Affects an actor ot the actors utilities (decorator, etc.) part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests
Projects
Development

Successfully merging this pull request may close these issues.

Add close channel feature to Datasourcing Actor
5 participants