Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiprocess+gevent + redis become slower #1147

Closed
631068264 opened this issue Mar 12, 2019 · 5 comments
Closed

multiprocess+gevent + redis become slower #1147

631068264 opened this issue Mar 12, 2019 · 5 comments

Comments

@631068264
Copy link

631068264 commented Mar 12, 2019

Version: redis==3.2.0 gevent==1.4.0

Platform: py2.7 , CentOS Linux release 7.4.1708

Description:
This is the synchronous code finish time log , get data very fast.

more log about kline.zip

kline-log    Tue, 12 Mar 2019 01:41:03 ERROR    Total time running [coinbell_bc_btc_15] redis update_aug: 0.0138018131256 seconds
kline-log    Tue, 12 Mar 2019 01:41:03 ERROR    Total time running [coinbell_bc_btc_15] total kline aug: 0.0759608745575 seconds

This is the asynchronous code finish time log , get data even cost more than 1 sec. Sometimes will cost 4 or 5 sec

kline-log    Tue, 12 Mar 2019 01:51:03 ERROR    Total time running [coinbell_bc_btc_15] redis update_aug: 1.5217051506 seconds
kline-log    Tue, 12 Mar 2019 01:51:04 ERROR    Total time running [coinbell_bc_btc_15] total kline aug: 1.61521601677 seconds

My redis config kwargs is the connect info just host db port and password. When use redis, I will get the same redis instance by key from redis_pools

redis_max_connections = 4

def __redis_conn( **kwargs):
    return redis.StrictRedis(connection_pool=redis.ConnectionPool(
        max_connections=config.redis_max_connections,
        # socket_timeout=config.redis_socket_timeout,
        socket_keepalive=True,
        **kwargs
    ))

def _init_redis(platform_name, key, **kwargs):
    redis_pools['{}_{}'.format(platform_name, key)] = __redis_conn(**kwargs)


redis_pools = {}

for platform_name, setting in config.redis_config.iteritems():
    for key, c in setting.iteritems():
        _init_redis(platform_name.lower(), key.lower(), **c)


def _get_redis(platform_name, key):
    return redis_pools['{}_{}'.format(platform_name.lower(), key.lower())]


save_redis = partial(_get_redis, key="save")

My synchronous code

def kline(mod_name='coinbell', run_key='kline_aug:1w', table_name=None):
    while True:
        with t_with_time('finish kline aug', logger.get('kline-log').error):
            mod = util.import_mod("fibo_spider.kline_spiders.%s" % mod_name)
            spider = mod.spiders
            robot = spider(run_key)

            for s in robot:
                flag = True if table_name is None else s.table_name == table_name
                if flag:
                    print s
                    with t_with_time('{} finish kline aug'.format(s.table_name), logger.get('kline-log').error):
                        s.run()

My asynchronous code I use gevent.poolin multiprocess to speed up

@func_time(logger.get("kline-log").error, "pool")
def pool_func(func, func_list):
    pool = Pool(50)
    try:
        for func_part in func_list:
            pool.add(gevent.spawn(func, func_part))
        pool.join()
    except Exception as e:
        raise


@func_time(logger.get("kline-log").error, "process")
def multi_process(func_part_list, process_num, run_func):
    if func_part_list:
        random.shuffle(func_part_list)
        process = []
        for func_part in util.split_list(func_part_list, process_num):
            # 加快统计效率
            p = Process(target=pool_func, args=(run_func, func_part))
            process.append(p)
            p.start()
        for pro in process:
            pro.join()
        del process[:]


def run(tar):
    tar.run()


def run_kline(program_name, func, key, module_list, process_num=8, run_func=run, wait=5):
    while True:
        try:
            with t_with_time('update-yaml', logger.get('kline-log').error):
                ParseYaml.update_coin_config(program_name)
            with t_with_time('update-func', logger.get('kline-log').error):
                func_par_list = func(key, module_list)
            if func_par_list:
                multi_process(func_par_list, process_num, run_func)
                logger.get("kline-log").info("{} multi_process finish".format(key))
                time.sleep(wait)
            else:
                time.sleep(4)
        except Exception as e:
            logger.get("kline-log").error(util.error_msg())

def get_kline_spider(key, module_list):
    k = []
    for name in module_list:
        with util.reload_mod(IMPORT_PATH_FORMAT % name) as mod:
            k_spider = getattr(mod, 'spiders', None)
            if k_spider:
                k.extend(k_spider(key))
    return k


@cli.command()
@click.argument('program_name')
@click.argument('key')
def aug(program_name, key):
    """更新K线到redis(全量增量)"""
    module_list = [program_name, ]
    spider_key = "kline_aug:{}".format(key)
    run_kline(program_name, get_kline_spider, spider_key, module_list, process_num=2, wait=1)

The important code just run the run function

class KlineAug(BaseRedis):
    """更新K线到redis"""
    max_k_period = config.max_k_period

    # db = dao.kline()

    def __init__(self, platform_name, table_name, key, field=None, parse_data_func=None, run_key=None):
        super(KlineAug, self).__init__(platform_name, table_name, key, field, run_key=run_key)
        self.parse_data_func = parse_data_func
        self.key = util.redis_key(const.REDIS_KEY_PREFIX.KLINE, self.table_name[:self.table_name.rfind('_')])
        self.redis = dao.save_redis(platform_name)
        self.db = dao.kline(platform_name)
        self.info_db = dao.info(platform_name)

        self.stash = Stash('kline_{}_aug'.format(table_name))
        self.ticker_last_tid_key = 'trade_last_tid'
        self.update_kline_dt_flag = 'kline_update_dt'

    def _update_aug(self):
        key = self.src_key.replace('kline', 'trades')
        with t_with_time('[%s] redis update_aug' % (self.table_name,), logger.get('kline-log').error):
            trade = util.json_loads(util.decompress(self.src_redis.get(key)))
        if trade:
            last_tid = self.stash.get(self.ticker_last_tid_key)
            new_trade = trade[0]
            logger.get('kline-log').error('[{}] {} {}'.format(self.table_name, last_tid, new_trade))
            return last_tid != new_trade['tid'], new_trade
        return False, None

    def run(self, params=None):
        # data = self.src_redis.hget(self.src_key, self.src_field)
        # rows = util.json_loads(util.decompress(data))
        with t_with_time('[%s] total kline aug' % (self.table_name,), logger.get('kline-log').error):
            update_dt = self.stash.get(self.update_kline_dt_flag)
            if update_dt and not util.update_limit_dt(update_dt, config.kline_wait_sec):
                logger.get('kline-log').error(
                    '[{}] {} {}'.format(self.table_name, update_dt, config.kline_wait_sec))
                return
            is_updated, new_trade = self._update_aug()
            try:
                if is_updated:
                    with t_with_time('[%s] kline aug get db' % (self.table_name,), logger.get('kline-log').error):
                        rows = QS(self.db).table(getattr(T, self.table_name)).order_by(
                            F.timestamp, desc=True).limit(0, self.max_k_period).select('*')
                    if rows:
                        # rows = self.parse_data(data[-self.max_k_period:])
                        df = pd.DataFrame(rows)
                        df.sort_values('timestamp', inplace=True)
                        # df = df.iloc[-self.max_k_period:]
                        df['timestamp'] = df['timestamp'].apply(lambda x: x * 1000)
                        self.insert_data(df)
                        logger.get('kline-log').error(
                            '[{}] {} {}'.format(self.table_name, rows[-1]['close'], new_trade['price']))
                        if new_trade and rows[-1]['close'] == util.safe_decimal(new_trade['price']):
                            self.stash[self.ticker_last_tid_key] = new_trade['tid']
                            self.stash[self.update_kline_dt_flag] = util.nowdt()
                    else:
                        self.none_data()

                self.insert_info()
            except Exception:
                logger.get('kline-log').error(util.error_msg())

    def insert_info(self):
        if self.run_key:
            db = self.info_db
            # data_type, _ = self.src_key.split(':')
            platform, symbola, symbolb, _ = self.table_name.split('_')
            with transaction(db) as trans:
                QS(db).table(T.collect_info).insert({
                    'type': self.run_key.lower(),
                    'symbol': '{}_{}'.format(symbola, symbolb).lower(),
                    'time': util.now(),
                    'platform': platform.lower(),
                }, on_duplicate_key_update={
                    'time': util.now(),
                })
                trans.finish()

    def insert_data(self, df):
        last_key = str(self.src_field) + ":last"
        minute = self.src_field
        minute_data = util.compress(self._df2json(df))
        last_key_data = self._df2json(df.iloc[-1])
        with t_with_time('[%s] kline aug send redis' % (self.table_name,),
                         logger.get('kline-log').error):
            self.redis.hmset(self.key, {
                minute: minute_data,
                last_key: last_key_data,
            })

    def _df2json(cls, df):
        rows = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
        return util.json_dumps(rows, default=str)

    def parse_data(self, data):
        return self.parse_data_func(data)

    def none_data(self):
        self.redis.delete(self.key)
@631068264 631068264 changed the title gevent + redis become slower multiprocess+gevent + redis become slower Mar 12, 2019
@andymccurdy
Copy link
Contributor

Thanks for the report. Have you tried the same benchmark with prior versions of redis-py? Was the gevent version faster when testing in prior versions?

@yevyevyev
Copy link

I have the same issue, @631068264 did you solve it?

@gabomasi
Copy link

gabomasi commented Feb 12, 2020

Any solution?
redis==2.10.5
gevent==1.0.2
CONNECTION_POOL_CLASS': 'redis.BlockingConnectionPool'

NewRelic report
Screenshot from 2020-02-12 10-08-20

@gabomasi
Copy link

You can solve the issue setting the "queue_class" param on the redis.BlockingConnectionPool using the Queue class provided by gevent

class redis.BlockingConnectionPool(max_connections=50, timeout=20, connection_class=<class 'redis.connection.Connection'>, queue_class=, **connection_kwargs)

from gevent.queue import Queue
BlockingConnectionPool(queue_class=Queue)

There are multiple queues classes, choose the one you like most
http://www.gevent.org/api/gevent.queue.html

@andymccurdy
Copy link
Contributor

Wonderful. Glad someone figured out a workaround. Thanks @gabomasi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants