Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiomcache concurrency issue #196

Closed
achedeuzot opened this issue Apr 21, 2017 · 18 comments
Closed

aiomcache concurrency issue #196

achedeuzot opened this issue Apr 21, 2017 · 18 comments

Comments

@achedeuzot
Copy link

achedeuzot commented Apr 21, 2017

I'm having a strange issue with wait_for(fut, timeout, *, loop=None) + aiocache on memcache.

We're storing values using aiocache.MemcachedCache and most methods of aiocache are decorated with @API.timeout which uses await asyncio.wait_for(fn(self, *args, **kwargs), timeout) (with a default timeout of 5 (seconds)).

When load testing our application, we see that with a big load, the asyncio loop clogs up and some requests to memcache raise asyncio.TimeoutError which is perfectly acceptable. The issue is that when we stop the load and allow for the loop to catch up, if we make any new request, all the memcache connections will fail with a class 'concurrent.futures._base.TimeoutError'. In other words, if we ever get a TimeoutError the application cache is completely broken and the only way to repair the application is the kill it and restart it, which is unacceptable. It seems as though the whole aiocache connection pool is closed and I don't find where this happens and how to prevent it.

I've tried the following:

  • Remove uvloop (just in case).
  • Wrapped the asyncio.wait_for() in a shield() function so it won't cancel the associated Task, no difference
  • Tried catching the following error types: asyncio.CancelledError, TimeoutError, asyncio.futures.TimeoutError, asyncio.TimeoutError or global Exception with no success, it seems my catching of the error is too late

The only thing that helps is increasing the connection pool size (2 by default to 500 for example) but even with a big connection pool, if we have a TimeoutError, we hit the same issue and the whole pool spins into everlasting errors.
And finally, if I remove the timeout by setting it to 0 or None, the library doesn't use asyncio.wait_for() but a simple await fn() and even though we have some slowness under load, there is no TimeoutError and the application always works. But waiting too long for cache is not a good idea, so I'd really like to use the timeout feature.

If anyone has any idea how to tackle this, I'd love to hear your input !

The versions involved:

  • python 3.5.3
  • uvloop 0.8.0
  • aiohttp 2.0.7
  • aiocache 0.3.3
  • aiomcache 0.5.1

I'm currently writing a small testcase to see if I can easily reproduce the issue. I'll post it here when it's done.

@achedeuzot achedeuzot changed the title if aiocache hits a TimeoutError it doesn't recover if aiocache hits a TimeoutError it doesn't recover Apr 21, 2017
@argaen
Copy link
Member

argaen commented Apr 21, 2017

Hi @achedeuzot, I'll investigate this during the weekend. The small testcase would definitely help, if not I'll also try to build something myself too

@argaen
Copy link
Member

argaen commented Apr 21, 2017

Mmmmm I tried a really simple example to see if it was recovering and seems it does (I hacked the code of aiomcache to add an asyncio.sleep(2) in the get command to simulate high latency so timeout triggers for the first two requests):

import asyncio

from aiomcache import Client
from aiocache import MemcachedCache


client = MemcachedCache(pool_size=1)

timeout = 0


async def get(key):
    try:
        global timeout
        timeout += 1
        return await asyncio.wait_for(client.get(key), timeout=timeout)
    except asyncio.TimeoutError:
        print("Timeout Error")


loop = asyncio.get_event_loop()
print(loop.run_until_complete(client.set("Hello", "World")))
for x in range(5):
    print(loop.run_until_complete(get("Hello")))

Output:

True
Timeout Error
None
Timeout Error
None
World
World
World

I did this simple one to check if aiomcache is releasing the pool connection correctly when there is a asyncio.TimeoutError. It seems it does because the last three queries return an error.

The example is not accurate because in real code, the cancellation event occurs while the connection is being used and here is during the sleep. I want to try if I can address this to reproduce it.

@argaen
Copy link
Member

argaen commented Apr 22, 2017

I've done another test simulating slow network with sudo tc qdisc add dev lo root netem delay 2000ms which adds a two second delay to each request (you can remove the rule with sudo tc qdisc del dev lo root).

The updated example:

import asyncio

from aiomcache import Client
from aiocache import MemcachedCache

MAX_TIMEOUT = 100

client = MemcachedCache(pool_size=1)


async def get(key, timeout):
    try:
        print("Calling client get with timeout {}, pool size {}".format(timeout, client.client._pool.size()))
        return await client.get(key, timeout=timeout)
    except asyncio.TimeoutError:
        print("Timeout Error")


loop = asyncio.get_event_loop()
for x in range(1, MAX_TIMEOUT, 2):
    print(loop.run_until_complete(get("Hello", x)))

and the output (I've added some print lines inside aiomcache code to see where it was being cancelled, no sleeps in the middle now though).:

Calling client get with timeout 1, pool size 0
Timeout Error
None
Calling client get with timeout 3, pool size 0
Timeout Error
None
Calling client get with timeout 5, pool size 0
aiomcache: received get b'Hello'
aiomcache: Wrote command
Timeout Error
None
Calling client get with timeout 7, pool size 0
aiomcache: received get b'Hello'
aiomcache: Wrote command
Timeout Error
None
Calling client get with timeout 9, pool size 0
aiomcache: received get b'Hello'
aiomcache: Wrote command
aiomcache: Read command: b'VALUE Hello 0 5 35\r\n'
aiomcache: retrieved get b'World'
World
Calling client get with timeout 11, pool size 1
aiomcache: received get b'Hello'
aiomcache: Wrote command
aiomcache: Read command: b'VALUE Hello 0 5 35\r\n'
aiomcache: retrieved get b'World'
World

Note that although the delay is only 2s, it takes much more than that to receive the value. This is because multiple loopback requests are done (i.e. acquire connection, write, read) and the 2s delay is for each one of them. Nevertheless it ends up resolving it correctly and recovering.

@achedeuzot
Copy link
Author

achedeuzot commented Apr 22, 2017

Hi Manuel,

I've tried creating a simple test case as you did but didn't manage to easily reproduce it. But I have more information about the circumstances where I see such behavior.

I'm using aiohttp (v2.0.7) for serving HTTP requests and aiocache for caching some sub-requests to micro-services. When the server starts, I initialize aiocache.MemcachedCache in a custom CacheManager class which I then register in aiohttp's Application global context which is passed to each HTTP handler. So each handler uses the global app['cache'] instance to make get() and set() operations on the cache.

What I noticed is that if I use a try: ... except asyncio.TimeoutError: ... in my CacheManager class (which adds a small wrapper around aiocache), the TimeoutError is not catched, as if the CacheManager was destroyed in the process of the TimeoutError or/and of the cancellation of the associated asyncio.Task. BUT if I use a the same try: ... except ...:... structure in my HTTP handler, I catch the TimeoutError !

class CacheManager:
    def __init__():
        self.cache = aiocache.MemcachedCache(...)

    def get(key):
        # try: ... except asyncio.TimeoutError here doesn't work !
        return self.cache.get(key)

    def set(key, value):
        return self.cache.set(key, value)

def handler_get(req):
    # try: ... except asyncio.TimeoutError: here works !!
    c = req.app['cache'].get('...')
    if c:
        return c
    return compute_stuff(...)

if __name__ == '__main__':
    app = aiohttp.web.Application()
    app['cache'] = CacheManager()
    app.router.add_route('GET', '/', handler_get)
    aiohttp.web.run_app(app)

Under load, it seems that aiohttp spawns multiple Tasks which have a reference to the global CacheManager containing the aiocache.MemcachedCache instance. When one of the HTTP handlers catches a asyncio.TimeoutError, the sub-Task has already been cancelled and the aiocache.MemcachedCache sockets to memcache seem to also have been closed. So even when I catch the error in the HTTP handler, it's too late and the subsequent calls to any app['cache'] get() or set() fails.

What I've currently done to 'fix' the issue is to catch the asyncio.TimeoutError in each HTTP handler and then create a new instance of CacheManager and re-register it into req.app['cache'] but I see in my logs that when the asyncio.TimeoutError happens, I have multiple CacheManagers that are re-created. As we've increased the connection pool size to 500, this is a dirty solution as I can't control how many CacheManagers are spawn (500 connections x n...) and I loose the whole purpose of having a common connection pool. So this is not the right approach. Do you have any suggestions and/or ideas of how I could give to each handler the same CacheManager without it being destroyed on asynctio.TimeoutError so I can use the connection pool and have a single CacheManager instance at any given time ?

Thank you very much for your help with this issue ! :)

@argaen
Copy link
Member

argaen commented Apr 22, 2017

Do you have the script to generate the load in order to reproduce the error in the example you've posted? I would like to play a bit with it to see it for myself.

@achedeuzot
Copy link
Author

achedeuzot commented Apr 24, 2017

Hi Manuel,

I managed to reproduce it with the following script:

import random
import string
import logging
import asyncio
import aiohttp
import aiocache

from aiohttp import web

logger = logging.getLogger(__name__)

class CacheManager:
    def __init__(self):
        self.cache = aiocache.MemcachedCache(port=11211, pool_size=2)

    def get(self, key):
        try:
            return self.cache.get(key, timeout=0.1)
        except asyncio.TimeoutError as e:
            logger.error("CacheManager.get exception:")
            logger.error(e)
        return None

    def set(self, key, value):
        return self.cache.set(key, value)

async def handler_post(req):
    try:
        data = await req.app['cache'].get('testkey')
        if data:
            return web.Response(text=data)
    except asyncio.TimeoutError as e:
        logger.error("handler_post exception:")
        logger.error(e)

    data = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1024))
    await req.app['cache'].set('testkey', data)
    return web.Response(text=data)

if __name__ == '__main__':
    app = web.Application()
    app['cache'] = CacheManager()
    app.router.add_route('POST', '/', handler_post)
    web.run_app(app)

The pool size is small (2) and the timeout is low (0.1) so that the asyncio.TimeoutError occurs. Once it has occurred, the log is full of ERROR __main__(33) | handler_post exception: and even if I wait a few minutes and retry benchmarking, all the requests fail. Also, the asyncio.TimeoutError is never catched inside the CacheManager as seen before.

The load is sent using ab: ab -n 500 -c 36 -H "Content-Type: application/json" -p /Users/achedeuzot/payload.json http://127.0.0.1:8080/

I created a repo with Dockerfile + docker-compose to easily spawn the server here: https://github.com/achedeuzot/aiocache-test1

Thanks for your feedback if you manage to have the same results as I did.

@achedeuzot
Copy link
Author

achedeuzot commented Apr 24, 2017

So, continuing the investigation, I can now catch the asyncio.Timeout successfully. But the issue of connections freezing still persists. I noticed that when there asyncio.Timeout exception occurs, the method aiomcache.pool.release is not called anymore, so it seems the connections that are interrupted on the timeout are never released.

I also noticed that the Memcached connection pool (from aiomcache.MemcachePool) looks like this '_pool': <Queue at 0x7b5913c1ba90 maxsize=2 tasks=2307>. I didn't find a way to get the connection used when the Timeout occurs so I could force-release it.

If I increase the connection pool, this error occurs a lot less but can still happen. I also noticed that on average, the connections that was used during a Timeout needs approx 10 minutes to be closed / garbage-collected. After waiting 10 minutes, I can make requests again without issues (until a new Timeout occurs).

One final observation is that if I do only set(), there is not asyncio.Timeout but if I do only get() I get the exception, so there must be a difference in implementation between the two methods that create the bad situation.

@argaen
Copy link
Member

argaen commented Apr 24, 2017

Huh, that's interesting and nice progress. Being able to catch the exception makes much sense, previous situation you were describing sounded so weird. Did you have to change anything regarding the catching of the exception?

Regarding the release not being called, I'll check this afternoon too. There must be some bug or something because in the test script I posted in the previous comment pool was recovering fine.

PS: I don't know if its the case, but this starts to mark the path for this being a bug for aiomcache more than for aiocache (although I will solve it anyway ofc :P).

@achedeuzot
Copy link
Author

achedeuzot commented Apr 24, 2017

Yep, I had to await the return of self.cache.get(key) for the exception to be catched. Otherwise I returned a co-routine and I had exited the function before the asyncio.Timeout could be catched. So this is a non-issue, just a big gotcha.

I don't know if it's a aiomcache issue in itself because aiomcache doesn't do any asyncio.Timeout management at all and the wrapping comes from aiocache.

@argaen
Copy link
Member

argaen commented Apr 24, 2017

Yeah, but still the user can use the asyncio.wait_for when calling aiomcache. Indeed, problem comes from aiomcache:

import random
import string
import logging
import asyncio
import aiohttp
import aiocache
import aiomcache

from aiohttp import web

logger = logging.getLogger(__name__)

class CacheManager:
    def __init__(self):
        self.cache = aiomcache.Client("127.0.0.1", 11211, pool_size=2)

    async def get(self, key):
        return await asyncio.wait_for(self.cache.get(key), 0.1)

    async def set(self, key, value):
        return await asyncio.wait_for(self.cache.set(key, value), 0.1)

async def handler_post(req):
    try:
        data = await req.app['cache'].get(b'testkey')
        if data:
            return web.Response(text=data.decode())
    except asyncio.TimeoutError as e:
        logger.error("handler_post exception:")
        logger.error(e)

    data = ''.join(random.choices(string.ascii_uppercase + string.digits, k=1024))
    await req.app['cache'].set(b'testkey', data.encode())
    return web.Response(text=data)

if __name__ == '__main__':
    app = web.Application()
    app['cache'] = CacheManager()
    app.router.add_route('GET', '/', handler_post)
    web.run_app(app)

I've changed the query to a GET one so to do the load testing: ab -n 1500 -c 60 http://127.0.0.1:8080/

Debugging aiomcache I see its exiting in https://github.com/aio-libs/aiomcache/blob/master/aiomcache/pool.py#L55 when the exception raises, still on it to see if this is expected or what... @fafhrd91 @asvetlov any ideas?

TLDR: It seems aiomcache doesn't deal well when tasks get cancelled due to a TimeoutError under high load.

@achedeuzot
Copy link
Author

So you managed to have the same issue ? That's good news ! Now we need to find why this happens and how to fix this :) Thank you so much for your cooperation with this !

@argaen
Copy link
Member

argaen commented Apr 24, 2017

Yup, sorry my previous comment was wrong. I've managed to reproduce both using aiocache and just aiomcache (my example is just with aiomcache).

no problem, checking it because aiocache really depends on aiomcache :P

@argaen
Copy link
Member

argaen commented Apr 24, 2017

Just for the record, seems the same example with RedisCache (uses aioredis as connector) recovers fine, no extra work needed.

@fafhrd91
Copy link
Member

check if aioredis uses streams api.

@achedeuzot
Copy link
Author

@fafhrd91 It seems like it's also the case, because it uses asyncio.open_connection.

@argaen
Copy link
Member

argaen commented Apr 25, 2017

yup, it uses asyncio.open_connection but the way of managing it is different, https://github.com/aio-libs/aioredis/blob/master/aioredis/pool.py#L50.

Dunno if that's the main problem but deque is thread-safe while asyncio.Queue is not.

@argaen argaen changed the title if aiocache hits a TimeoutError it doesn't recover aiomcache concurrency issue May 1, 2017
@argaen
Copy link
Member

argaen commented May 1, 2017

@achedeuzot I'm closing this issue because there is nothing in aiocache that can be done to fix this. This week I'll start working on aiomcache to fix it.

@argaen argaen closed this as completed May 1, 2017
@achedeuzot
Copy link
Author

@argaen Great ! :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants