Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leaking connections when used with redis-py's asyncio.ConnectionPool #150

Closed
thevaizman opened this issue Jun 14, 2022 · 10 comments
Closed

Comments

@thevaizman
Copy link

My setup:

Ubuntu 20.04.4 machine with Linux kernel 5.18
Dragonfly x86_64 version 0.3 alpha (bare metal)

Issue:

When running an async redis client and using pipelining in order to batch transactions, after a short period of time Dragonfly stops responding and it just hangs.
When observing the open connections, I found out that there are multiple ESTABLISHED connections to Dragonfly, all of them are just hanging.

Same code works just fine with the latest redis-server and the connections are closed properly.

Python code to reproduce the problem:

connection_pools = {
    "my_db" : None,
    "my_db2" : None
}

connection_pools[db_name] = aioredis.ConnectionPool(host=config["hostname"], port=config["port"], db=config["databases"][db_name], password=config["password"], decode_responses=True, max_connections=16)

async def post_to_redis(my_list, my_list2, db_name):
        if connection_pools[db_name] is None:
                await init_pool(db_name)
            results = None
        try:
                redis_client = aioredis.Redis(connection_pool=connection_pools[db_name])
                async with redis_client.pipeline(transaction=True) as pipe:
                        for k,v in list(my_list):
                                pipe.hsetnx(name=k, key="name", value=v)
                        for k,v in list(my_list2):
                                pipe.hsetnx(name=k, key="name2", value=v)
                        if len(pipe.command_stack) > 0:
                                results = await pipe.execute()
        finally:
                await redis_client.close()
                return results

async def do_concurrent(db_name):
        tasks = []
        for i in range(1,100):
                tasks.append(post_to_redis(my_list, my_list2, db_name))
        res = await asyncio.gather(*tasks)```
@romange
Copy link
Collaborator

romange commented Jun 14, 2022

Thanks @thevaizman .

I am not familiar with async functions in python. How do I call do_concurrent from the main function?

@thevaizman
Copy link
Author

thevaizman commented Jun 14, 2022

Hey, you will have to modify the provided code (especially the ConnectionPool bit) so that you can connect to your local Dragonfly DB instance.
Regarding calling async functions, you can add the following code to the file:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_concurrent("my_db"))

@romange
Copy link
Collaborator

romange commented Jun 14, 2022

@thevaizman I think that init_pool function is missing. also my_list and my_list2

@thevaizman
Copy link
Author

It is missing, sorry, I had to redact some stuff since this is some production code of ours which I'm not keen to share ;)

async def init_pool(db_name):
    global connection_pools
    config = await read_config("redis_config")
    if connection_pools[db_name] is None:
        log.info("Will init ConnectionPool for {}".format(db_name))
        connection_pools[db_name] = aioredis.ConnectionPool(host=config["hostname"], port=config["port"], db=config["databases"][db_name], password=config["password"], decode_responses=True, max_connections=16)

@thevaizman
Copy link
Author

And for creating the lists, you can use whatever values since the values don't affect the outcome:

 async def create_list():
        l = []
        for i in range(1,100):
                l.append((i,"val"))
        return l

@romange
Copy link
Collaborator

romange commented Jun 14, 2022

Hi Tal,

my final version of the script is below.
Can you confirm that this script stalls DF in your environment? If not, then what's missing?

#!/usr/bin/env python3

import asyncio
import aioredis
 
connection_pools = {
    "my_db" : None,
}

connection_pools["my_db"] = aioredis.ConnectionPool(host="localhost", port=6379, 
                                                    db=1, decode_responses=True, max_connections=16)

my_list = [("key1_" + str(i), "val1_" + str(i)) for i in range(100) ]
my_list2 = [("list1_" + str(i), "lval1_" + str(i)) for i in range(100) ]

async def post_to_redis(my_list, my_list2, db_name):
    results = None
    try:
        redis_client = aioredis.Redis(connection_pool=connection_pools[db_name])
        async with redis_client.pipeline(transaction=True) as pipe:
                for k,v in list(my_list):
                        pipe.hsetnx(name=k, key="name", value=v)
                for k,v in list(my_list2):
                        pipe.hsetnx(name=k, key="name2", value=v)
                if len(pipe.command_stack) > 0:
                        results = await pipe.execute()
    finally:
            await redis_client.close()
            return results


async def do_concurrent(db_name):
        tasks = []
        for i in range(1,100):
                tasks.append(post_to_redis(my_list, my_list2, db_name))
        res = await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_concurrent("my_db"))

@thevaizman
Copy link
Author

thevaizman commented Jun 14, 2022

Hey again, sorry, it looks like I forgot to mention that the keys need to actually be unique so that they can be inserted into the DB. Also, the Dragonfly DB machine I'm using is remote (not sure if that also matters).
Sorry for the confusion.

Anyways, just confirmed that the following script indeed makes Dragonfly hang after ~20s:

from redis import asyncio as aioredis
import asyncio 
from loguru import logger as log
import sys
import random

connection_pools = {
    "my_db" : None,
}
connection_pools["my_db"] = aioredis.ConnectionPool(host="---------", port=6380, 
                                                    db=1, password="-------", decode_responses=True, max_connections=16)

async def post_to_redis(sem, db_name):
    async with sem:
        results = None
        try:
            redis_client = aioredis.Redis(connection_pool=connection_pools[db_name])
            async with redis_client.pipeline(transaction=True) as pipe:
                    for i in range(1,100):
                            pipe.hsetnx(name=random.randint(1,999999999), key="name", value=random.randint(1,999999999))
                    for i in range(1,100):
                            pipe.hsetnx(name=random.randint(1,999999999), key="name2", value=random.randint(1,999999999))
                    if len(pipe.command_stack) > 0:
                            results = await pipe.execute()
        finally:
                await redis_client.close()
                log.info(results)

async def do_concurrent(db_name):
    tasks = []
    sem = asyncio.Semaphore(10)
    for i in range(1,100000):
        log.info("Adding - {}".format(i))
        tasks.append(post_to_redis(sem, db_name))
    res = await asyncio.gather(*tasks)
    for r in res:
        log.info(r)

if __name__ == '__main__':
    log.remove()
    log.add(sys.stdout, enqueue=True, level='INFO')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_concurrent("my_db"))

You can also observe how the connections are all ESTABLISHED and won't close until I forcefully kill the DF process:

A76BF2BC-46C8-47CA-9C4A-21437215079A

@romange
Copy link
Collaborator

romange commented Jun 14, 2022

Thanks, I succeed to reproduce it locally.

@romange
Copy link
Collaborator

romange commented Jun 15, 2022

Dude, thanks for helping me to reproduce this bug!
It's probably the toughest one I handled in the last 3 months (still did not solve it, btw).

@thevaizman
Copy link
Author

Sure thing! Glad I could help. Thank you for the effort and in general for DF, can’t wait to use it in our production! :)

romange added a commit that referenced this issue Jun 15, 2022
In rare cases a scheduled transaction is not scheduled correctly and we need
to remove it from the tx-queue in order to re-schedule. When we pull it from tx-queue
and it has been located at the head, we must poll-execute the next txs in the queue.

1. Fix the bug.
2. Improve verbosity loggings to make it easier to follow up on tx flow in release mode.
3. Introduce /txz handler that shows currently pending transactions in the queue.
4. Fix a typo in xdel() function.
5. Add a py-script that reproduces the bug.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants