Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full in memory channel layer #863

Merged
merged 20 commits into from
Feb 13, 2018
Merged

Full in memory channel layer #863

merged 20 commits into from
Feb 13, 2018

Conversation

svenauhagen
Copy link
Contributor

Hi,

I added the full in memory layer back to channels 2.
We always used it in channels 1 for local testing.

Can you merge it into the code?

Best and thanks
Sven

@svenauhagen svenauhagen changed the title Fill in memory channel layer Full in memory channel layer Feb 5, 2018
@andrewgodwin
Copy link
Member

I will happily take this once it has unit tests (I'll write the documentation part) - you may want to reuse the channels_redis unit tests for the most part.

@svenauhagen
Copy link
Contributor Author

I ported over the channel_redis tests to the inmemory layer.
Let me know if that is alright or if you want me to add additional tests.

Copy link
Member

@andrewgodwin andrewgodwin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having looked over the code more, I'm not sure you need all the complexity that the Redis client has with receiving loops - you should be able to use asyncio Queue objects for most of the operations here and await on them natively (and there's no need to pack and unpack around things with ! in them as that's purely a network efficiency thing).

Do you think you'd be able to reduce it down to have less of the receive_loop stuff? The version you're replacing was a lot simpler.

@svenauhagen
Copy link
Contributor Author

Ah yes, I wasn't aware of the asyncio queue.
I am knew to asyncio just learned it over the weekend to write the memory channel layer :)

Do I understand you correctly that I should simply use the raw channel name for the queue key?
Should I also dismiss the assert self.valid_channel_name(channel, receive=True) check then?

@andrewgodwin
Copy link
Member

You can leave the valid check in, but there's no need to do the part where it takes a name with a ! in it and splits it into a non-local part, and puts the real channel name into the message dict.

@svenauhagen
Copy link
Contributor Author

Alright, can you look at it now.
I tested it today and it worked well that way.

In-memory channel layer implementation for testing purposes.
"""
'''
Our own in memory layer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to change the docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that was left in there from testing I changed it to "In-memory channel layer implementation" since it can be used for more than testing now

queue = self.channels.setdefault(channel, asyncio.Queue())

# Do a plain direct receive
_, message = await queue.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add something here to clean up the queue entry in the self.channels dict if it's now empty.

"""
'''
In-memory channel layer implementation
'''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change these back to """ - trying to keep everything consistent!

self.channels = {}
self.groups = {}
self.thread_lock = threading.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure a threading.Lock makes sense here - you shouldn't need it since you're writing async code. Did you find a reason to add it?

self._remove_from_groups(channel)
# Is the channel now empty and needs deleting?
if not queue:
del self.channels[channel]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should also clean expired group memberships separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by that exactly?
Should I remove the channel from all groups if not queue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to go through the group memberships and expire the entries if their expiry time is bigger than group_expiry (there should be a test for this too, but I'll add that later). The Redis one does it here: https://github.com/django/channels_redis/blob/master/channels_redis/core.py#L316

Copy link
Member

@andrewgodwin andrewgodwin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good - just a few small cleanups, but this is getting close to merge! Thanks for your work so far.

@svenauhagen
Copy link
Contributor Author

Great, I will get on these cleanups later today.
Regarding the thread lock I do have a general question since I am not sure.
How is daphne handling new web requests in 2.0?
Is it spawning a thread for each request or is it entirely done with asyncio coroutines?

@andrewgodwin
Copy link
Member

It's all coroutines. Some requests will end up using threads if they're based on SyncConsumer, but all the async stuff (calls to channel layers included) happens in a single main thread on a single event loop.

@svenauhagen
Copy link
Contributor Author

Thanks for the explanation.
Isn't that a problem though if some async requests run longer without await daphne will be blocked to answer new requests?

@andrewgodwin
Copy link
Member

No, because they're async requests, as long as they await anything that's long running (as everything is, like receiving off the channel layer) the event loop frees up. You can have thousands of coroutines in the same event loop easily.

@svenauhagen
Copy link
Contributor Author

Ok I think I got it now

@@ -267,7 +266,8 @@ def _clean_expired(self):
for group in self.groups:
for channel in self.groups.get(group, set()):
# If join time is older than group_expiry end the group membership
if self.groups[group][channel] and int(self.groups[group][channel]) < (int(time.time()) - self.group_expiry):
if (self.groups[group][channel] and
int(self.groups[group][channel]) < (int(time.time()) - self.group_expiry)):
del self.groups[group][channel]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe shortcut local variable will be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a timeout variable

@andrewgodwin
Copy link
Member

Looks good! I am ready to merge this if you are,

@brubbel
Copy link

brubbel commented Feb 24, 2018

@svenauhagen @andrewgodwin

channels/channels/layers.py

Lines 268 to 272 in d6643d9

for channel in self.groups.get(group, set()):
# If join time is older than group_expiry end the group membership
if self.groups[group][channel] and int(self.groups[group][channel]) < timeout:
# Delete from group
del self.groups[group][channel]

In python3, looping over a dict/set defaults to iterators, and _clean_expired() raises an exception, as changing the dict/set is not allowed during iteration.
for channel in self.groups.get(group, set()):
should be
for channel in list(self.groups.get(group, set())):

Error below:

Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/xyz/venv3/lib/python3.5/site-packages/asgiref/sync.py", line 49, in __call__
    return call_result.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/home/xyz/venv3/lib/python3.5/site-packages/asgiref/sync.py", line 63, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/home/xyz/django/abc/consumers.py", line 47, in sender
    await self.channel_layer.group_send(self.MY_GROUP, message)
  File "/home/xyz/venv3/lib/python3.5/site-packages/channels/channels/layers.py", line 321, in group_send
    self._clean_expired()
  File "/home/xyz/venv3/lib/python3.5/site-packages/channels/channels/layers.py", line 268, in _clean_expired
    for channel in self.groups.get(group, set()):
RuntimeError: dictionary changed size during iteration

@andrewgodwin
Copy link
Member

I just pushed up a commit to fix that.

@brubbel
Copy link

brubbel commented Feb 25, 2018

I would like to point out some additional issues.

channels/channels/layers.py

Lines 261 to 263 in b58c213

# Is the channel now empty and needs deleting?
if not queue:
del self.channels[channel]

Previously (before asyncio), self.channels had been using a collections.deque, for which the following is valid to check if queue is empty:

if not queue:
     do_something()

However, the truth value of a queue.Queue or asyncio.Queue cannot be used to check whether the queue is empty. It will always return True and the channel is never deleted. The correct statement is as follows:

if queue.empty():
     do_something()

With the above corrected statement effectively purging channels from the self.channels dict, it also starts to delete non-expired (empty, but still active) channels. This breaks InMemoryChannelLayer again.
The corrected statement, which only purges expired channels, is as follows:

if remove and queue.empty():
    del self.channels[channel]

@brubbel
Copy link

brubbel commented Feb 25, 2018

Even with the above fixes, the InMemoryChannelLayer keeps leaking channel objects in self.channels {}, when websocket connections are coming and going over time. The group_discard() method is called, which is correct, but channels with an empty queue keep hanging around.

I suggest the following fixes to _group_expire(), please comment if I missed something:
brubbel@ad6b135

@andrewgodwin
Copy link
Member

andrewgodwin commented Feb 25, 2018

Can you open a separate issue to track this? It's getting complex enough it needs one.

It's also worth noting that the in-memory channel layer is really only meant for testing, so fixes to its performance/longevity are going to be lower priority than some other bugs in the queue!

@brubbel
Copy link

brubbel commented Feb 25, 2018

I am fully aware that the in-memory channel layer is for debug only, however -imho- for applications on a lightweight system such as raspberry pi, I think it is a reasonable alternative.

@andrewgodwin
Copy link
Member

Right, just saying that fixing this is going to come below the 6 or so other bugs I have to look at at the moment!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants