Skip to content

Commit

Permalink
reduce memory usage of Connection (#377)
Browse files Browse the repository at this point in the history
* reduce memory usage of Connection

* allow ValueError on _used_channel_ids.remove
  • Loading branch information
pawl committed Dec 14, 2021
1 parent fb162f2 commit be6b5ed
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
6 changes: 5 additions & 1 deletion amqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ def collect(self):
connection, self.connection = self.connection, None
if connection:
connection.channels.pop(channel_id, None)
connection._avail_channel_ids.append(channel_id)
try:
connection._used_channel_ids.remove(channel_id)
except ValueError:
# channel id already removed
pass
self.callbacks.clear()
self.cancel_callbacks.clear()
self.events.clear()
Expand Down
22 changes: 12 additions & 10 deletions amqp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def __init__(self, host='localhost:5672', userid='guest', password='guest',
self.on_unblocked = on_unblocked
self.on_open = ensure_promise(on_open)

self._avail_channel_ids = array('H', range(self.channel_max, 0, -1))
self._used_channel_ids = array('H')

# Properties set in the Start method
self.version_major = 0
Expand Down Expand Up @@ -482,18 +482,20 @@ def collect(self):
self._transport = self.connection = self.channels = None

def _get_free_channel_id(self):
try:
return self._avail_channel_ids.pop()
except IndexError:
raise ResourceError(
'No free channel ids, current={}, channel_max={}'.format(
len(self.channels), self.channel_max), spec.Channel.Open)
for channel_id in range(1, self.channel_max):
if channel_id not in self._used_channel_ids:
return channel_id

raise ResourceError(
'No free channel ids, current={}, channel_max={}'.format(
len(self.channels), self.channel_max), spec.Channel.Open)

def _claim_channel_id(self, channel_id):
try:
return self._avail_channel_ids.remove(channel_id)
except ValueError:
if channel_id in self._used_channel_ids:
raise ConnectionError(f'Channel {channel_id!r} already open')
else:
self._used_channel_ids.append(channel_id)
return channel_id

def channel(self, channel_id=None, callback=None):
"""Create new channel.
Expand Down
6 changes: 5 additions & 1 deletion t/unit/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import socket
import warnings
from array import array
from unittest.mock import Mock, call, patch

import pytest
Expand Down Expand Up @@ -347,8 +348,11 @@ def test_collect_again(self):
self.conn.collect()
self.conn.collect()

def test_get_free_channel_id(self):
assert self.conn._get_free_channel_id() == 1

def test_get_free_channel_id__raises_IndexError(self):
self.conn._avail_channel_ids = []
self.conn._used_channel_ids = array('H', range(1, self.conn.channel_max))
with pytest.raises(ResourceError):
self.conn._get_free_channel_id()

Expand Down

0 comments on commit be6b5ed

Please sign in to comment.