Skip to content

Commit

Permalink
Merge c7ddc27 into c13e336
Browse files Browse the repository at this point in the history
  • Loading branch information
grant-aterlo committed Aug 16, 2016
2 parents c13e336 + c7ddc27 commit fbbb230
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 26 deletions.
34 changes: 25 additions & 9 deletions aiomcache/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ def __init__(self, host, port, *, minsize, maxsize, loop=None):
self._host = host
self._port = port
self._minsize = minsize
self._maxsize = maxsize
self._loop = loop
self._pool = asyncio.Queue(maxsize, loop=loop)
self._in_use = set()
self._size = 0

@asyncio.coroutine
def clear(self):
Expand All @@ -26,6 +28,7 @@ def clear(self):
self._do_close(conn)

def _do_close(self, conn):
self._size -= 1
conn.reader.feed_eof()
conn.writer.close()

Expand All @@ -36,8 +39,12 @@ def acquire(self):
:return: ``tuple`` (reader, writer)
"""
while self.size() < self._minsize:
while self._size < self._minsize:
_conn = yield from self._create_new_conn()

# Could not create new connection
if _conn is None:
break
yield from self._pool.put(_conn)

conn = None
Expand All @@ -53,6 +60,9 @@ def acquire(self):
if conn is None:
conn = yield from self._create_new_conn()

# Give up control
yield from asyncio.sleep(0, loop=self._loop)

self._in_use.add(conn)
return conn

Expand All @@ -66,16 +76,22 @@ def release(self, conn):
if conn.reader.at_eof() or conn.reader.exception():
self._do_close(conn)
else:
try:
self._pool.put_nowait(conn)
except asyncio.QueueFull:
self._do_close(conn)
# This should never fail because poolsize=maxsize
self._pool.put_nowait(conn)

@asyncio.coroutine
def _create_new_conn(self):
reader, writer = yield from asyncio.open_connection(
self._host, self._port, loop=self._loop)
return _connection(reader, writer)
if self._size < self._maxsize:
self._size += 1
try:
reader, writer = yield from asyncio.open_connection(
self._host, self._port, loop=self._loop)
except:
self._size -= 1
raise
return _connection(reader, writer)
else:
return None

def size(self):
return len(self._in_use) + self._pool.qsize()
return self._size
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def mcache_server(unused_port, docker, session_id):
mcache_params = dict(host=host,
port=port)
delay = 0.001
for i in range(100):
for i in range(10):
try:
conn = memcache.Client(
['{host}:{port}'.format_map(mcache_params)])
Expand All @@ -253,6 +253,7 @@ def mcache_server(unused_port, docker, session_id):
container['host'] = host
container['port'] = port
container['mcache_params'] = mcache_params
time.sleep(0.1)
yield container

docker.kill(container=container['Id'])
Expand Down
73 changes: 57 additions & 16 deletions tests/pool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,6 @@ def test_pool_clear(mcache_params, loop):
assert pool._pool.qsize() == 0


@pytest.mark.run_loop
def test_pool_is_full(mcache_params, loop):
pool = MemcachePool(minsize=1, maxsize=2, loop=loop, **mcache_params)
conn = yield from pool.acquire()

# put garbage to the pool make it look like full
mocked_conns = [_connection(0, 0), _connection(1, 1)]
yield from pool._pool.put(mocked_conns[0])
yield from pool._pool.put(mocked_conns[1])

# try to return connection back
assert pool.size() == 3
pool.release(conn)
assert pool.size() == 2


@pytest.mark.run_loop
def test_acquire_dont_create_new_connection_if_have_conn_in_pool(mcache_params,
loop):
Expand All @@ -73,3 +57,60 @@ def test_acquire_dont_create_new_connection_if_have_conn_in_pool(mcache_params,
conn = yield from pool.acquire()
assert conn is _conn
assert pool.size() == 1


@pytest.mark.run_loop
def test_acquire_limit_maxsize(mcache_params,
loop):
pool = MemcachePool(minsize=1, maxsize=1, loop=loop, **mcache_params)
assert pool.size() == 0

# Create up to max connections
_conn = yield from pool.acquire()
assert pool.size() == 1
pool.release(_conn)

@asyncio.coroutine
def acquire_wait_release():
conn = yield from pool.acquire()
assert conn is _conn
yield from asyncio.sleep(0.01, loop=loop)
assert len(pool._in_use) == 1
assert pool.size() == 1
assert pool._pool.qsize() == 0
pool.release(conn)

yield from asyncio.gather(*([acquire_wait_release()] * 3), loop=loop)
assert pool.size() == 1
assert len(pool._in_use) == 0


@pytest.mark.run_loop
def test_maxsize_greater_than_minsize(mcache_params, loop):
pool = MemcachePool(minsize=5, maxsize=1, loop=loop, **mcache_params)
conn = yield from pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
pool.release(conn)


@pytest.mark.run_loop
def test_0_minsize(mcache_params, loop):
pool = MemcachePool(minsize=0, maxsize=5, loop=loop, **mcache_params)
conn = yield from pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
pool.release(conn)


@pytest.mark.run_loop
def test_bad_connection(mcache_params, loop):
pool = MemcachePool(minsize=5, maxsize=1, loop=loop, **mcache_params)
pool._host = "INVALID_HOST"
assert pool.size() == 0
with pytest.raises(Exception):
conn = yield from pool.acquire()
assert isinstance(conn.reader, asyncio.StreamReader)
assert isinstance(conn.writer, asyncio.StreamWriter)
pool.release(conn)
assert pool.size() == 0

0 comments on commit fbbb230

Please sign in to comment.