Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need a limited Threadpool in consumer side #2013

Closed
Jaskey opened this issue Jul 2, 2018 · 13 comments
Closed

Need a limited Threadpool in consumer side #2013

Jaskey opened this issue Jul 2, 2018 · 13 comments
Assignees
Milestone

Comments

@Jaskey
Copy link

Jaskey commented Jul 2, 2018

Environment

  • Dubbo version: 2.5.3
  • Java version: 1.7

in some circumstances, too many threads will be created and thus process will suffer from OOM.Here is the related issue #1932

After analyzing the problem, I found that the consumer used a cached thread pool which has no limited thread size, which is the root cause. So a choice for users to limit the max size of consumer thread is needed.


Here is one case to reproduce the problem:

consumer A call service from provider B in a very big tps, but provider B is not responding very quickly in some times due to some pause, and the calls timeout, but after a short time, provider B becomes quick so the large number of responses send back to consumer A nearly in the same time. In this case, you will find many many consumer threads are created in consumer side due to the current design.

You can easily reproduce this issue with the following provider sample to simulate the provider problem which timeout response send back in the same time to consumer side, and then consumer created many threads in this case.

public class MockImpl implements Mock {

    private static final Logger logger = LoggerFactory.getLogger(MockImpl.class);
    public void sleep(long ms) {
        long span = computeNextMinuteSpan();
        logger.info("begin to sleep "+ms+" ms");
        try {

            Thread.sleep(ms);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("after sleep " + ms + " ms");
    }

    public void sleepToNextMinute() {
        long span = computeNextMinuteSpan();
        sleep(span);
    }


    public static long computeNextMinuteSpan() {
        long now = System.currentTimeMillis();
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(System.currentTimeMillis());
        cal.add(Calendar.MINUTE, 1);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        return cal.getTimeInMillis() - now;
    }
}
  1. export this provider with a short timeout say 10ms, with big thread count say 1000. And if you export many providers instead of only one, the problem will be more obvious.
  2. in a for loop, continuously consume the service sleepToNextMinute in a single thread or in thread pool.
  3. Observe the thread count of DubboClient .

Here is a sample for consumer



        logger.info("sleeping till next minute ......");
        Thread.sleep(computeNextMinuteSpan());
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
                for (Thread t : threadSet) {
                    if (t.getName().startsWith("DubboClientHandler")) {
                        logger.info("dubbo thread: {}",t.getName());
                        i++;
                    }
                }
                logger.info("=================================Dubbo Thread  {}===================================",i);

            }
        },0,1000, TimeUnit.MILLISECONDS);
        logger.info("mocking...");
        for (int i =0;i<10000;i++) {
            try {
                mock.sleepToNextMinute();
            } catch (Exception e) {
                //logger.error(e.getMessage());
            }
        }
        logger.info("mocking ends");

You can easily find that the consumer threads increase heavily in a very short time even if you are calling the provider service with only one thread

@ralf0131
Copy link
Contributor

ralf0131 commented Jul 2, 2018

+1 for a limited thread pool on consumer side.

@chickenlj
Copy link
Contributor

So a choice for users to limit the max size of consumer thread is needed.

Agree to left the choice to users, the best threadpool policies may vary in different scenarios. Would you mind to enable the consumer side threadpool configuration and send a PR?

Consider another perspective, you may need to scale your cluster on the consumer side to make sure it can cope with the blazingly amount of QPS.

@Jaskey
Copy link
Author

Jaskey commented Jul 10, 2018

when can this issue be implemented @chickenlj

@houbic
Copy link

houbic commented Jul 16, 2018

I also hit this issue.Could I know when you can fix it please. @chickenlj @lovepoem

@tswstarplanet
Copy link
Contributor

I want to solve the problem, and I want to add a tag 'threadpool', but what complex tag should I add the tag to?

@ralf0131
Copy link
Contributor

@tswstarplanet Please feel free to go ahead and submit your pull request! I think the tag 'threadpool' is too fine-grained, which will result in too much tags.

@chickenlj
Copy link
Contributor

15_38_40__05_31_2018

I have a roughly drawn threadpool structure of Dubbo, hope it can help you understand how it works.

@tswstarplanet
Copy link
Contributor

tswstarplanet commented Jul 19, 2018

@ralf0131 Sorry, I understand your reply incompletely. Do you mean that I should not add a "threadpool" tag to any complex tag? Instead I should solve this problem by other methods?

@ralf0131
Copy link
Contributor

ralf0131 commented Jul 19, 2018

@tswstarplanet Maybe I misunderstood your meaning, I thought you were going to add a label called 'theadpool' :) So what do you actually mean?

@tswstarplanet
Copy link
Contributor

@ralf0131 Yes, I think adding a label "threadpool" can solve the problem. But maybe my English is a little poor, I don't understand your reply published this morning. I'm not sure you say yes or no to my plan.

@chickenlj chickenlj modified the milestones: 2.6.3, 2.6.4 Jul 23, 2018
cvictory pushed a commit to cvictory/dubbo that referenced this issue Jul 23, 2018
@Jaskey
Copy link
Author

Jaskey commented Aug 1, 2018

@chickenlj what is the bug fix pull request? is it merged into master, why is this issue closed now

@ralf0131
Copy link
Contributor

ralf0131 commented Aug 1, 2018

@Jaskey #2114, which is included in 2.6.3 release. Please check whether it will solve your problem.

@HelloLyfing
Copy link

HelloLyfing commented Jul 20, 2022

我是格家网络的一名技术人员,最近也是遇到了这个问题,为了讨论和分享我们的解决方案,我把我博客上的内容发过来供大家看下哈。

Dubbo在 < 2.7.5的版本中(我们使用的是2.6.7),每个端到端的连接(以IP + 端口确定一条端到端连接),都会有自己私有的线程池,用来给消费侧应用的Dubbo调用的响应进行线程池提交处理(也就是底层通信组件(Netty)只负责把响应Message提交到线程池就结束了,防止Netty内部因为Dubbo业务不稳定而导致信息阻塞)。但每个连接私有的线程池使用的是线程数量无上限(Integer.max)的线程数配置,再结合队列使用的是SynchronousQueue,一旦线程池资源处理不过来,就会持续新建线程处理队列消息。

既有的Dubbo调用图示,注意看,每个Channel(或者叫Client更严谨)都有自己私有的线程池,而且这些线程池的线程数量无上限;
image

这个方案造成的问题:新建线程越来越多之后,应用在线程调度、上下文切换上的系统开销可能已经远大于业务开销,造成业务得不到及时处理并导致RT增高。

这个现象在商详这种有大量Dubbo消费需求的应用中更为突显,我通过ab工具实测了下,发现正常情况下,商详应用的私有线程池数量总和大概是100多个左右,而在调用量很大的时候,这些私有线程池数量总和会急剧增加到1k+ 。作为对比,商详应用在日常流量下Java总线程数也才800个左右。

针对这个Dubbo的性能瓶颈问题,目前Dubbo的解决思路是(Dubbo 2.7.5及以后):伪造一个类似线程池机制的假线程池(ThreadlessExecutor),它的作用是暂时接收任务,但不处理,转而交由业务线程(也就是一直在Future.get()那里阻塞的线程)主动向该假线程池中索取属于自己的响应任务并执行。毕竟,与其让业务线程一直等待,还不如让他自主进行自己期待的响应消息的处理。对应代码

@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (executor != null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
        // 主动索取属于自己的响应任务并自主处理
        threadlessExecutor.waitAndDrain();
    }
    return responseFuture.get(timeout, unit);
}

对应调用流程图解↓:第一个流程是常规流程;第二个流程是使用ThreadlessExecutor方案后的流程(为便于说明,可能部分内容有精简抽象)
image

注意上面的代码出现了if分支改造(老版本没有这段if代码),也就是Dubbo低版本没办法通过SPI扩展的方式使用上述解决方案,必须修改低版本的源代码来实现。

那有没有办法在不修改代码的前提下,仅通过SPI扩展的方式来解决上面提到的瓶颈问题呢?我们自己的方案是:既然线程池是私有的,而且会无限创建,那么我们针对该问题优化下就好了:提供一个所有消费者共享的线程池:我们叫GSF线程池,而且限制总线程数量和任务队列长度。

那么问题又来了,万一瞬时响应消息太多,一个线程池处理不过来怎么办?复用Dubbo既有逻辑,1)如果GSF共享线程池处理不过来,则降级使用Channel的内部共享线程池处理,如果获取Channel的内部共享线程池失败,则降级使用当前线程(即Netty的worker线程)进行处理。这个降级策略是通过使用自建的RejectedExecutionHandler来实现的。

使用Gsf共享线程池的Dubbo调用图示:
image

自主实现的threadpool代码也分享下,方便大家有需要了自取:
https://gist.github.com/HelloLyfing/328217c39b51fb0fe43d6eb301d35312

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants