Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2019-11-22:什么是BlockingQueue?请分析一下其内部原理并谈谈它的使用场景? #194

Open
Moosphan opened this issue Nov 22, 2019 · 2 comments
Labels

Comments

@Moosphan
Copy link
Owner

No description provided.

@Moosphan Moosphan added the Java label Nov 22, 2019
@zhaoerlei1989
Copy link

前言:
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

认识BlockingQueue
阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
  先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
  后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

  多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

下面两幅图演示了BlockingQueue的两个常见阻塞场景:
       如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

   如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的常用方法:
BlockingQueue的核心方法:
放入数据:
  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.
获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入;
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
常见BlockingQueue
在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?

BlockingQueue成员详细介绍

  1. ArrayBlockingQueue
    基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
      ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

  2. LinkedBlockingQueue
    基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
    作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

@senlinxuefeng
Copy link

#关于BlockingQueue
BlockingQueue即我们所说的阻塞队列,它的实现基于ReentrantLock,通常我们谈及到阻塞队列,都会和生产者/消费者模式关联起来(这是最常用的场景),和一般的非阻塞队列区别在于实现生产者/消费者模式中不需要额外的实现线程同步和唤醒。

#ArrayBlockingQueue使用场景
特征:
基于数组实现,队列容量固定。存/取数据的操作共用一把锁(默认非公平锁),无法实现真正意义上存/取操作并行执行。

分析:
由于基于数组,容量固定所以不容易出现内存占用率过高,但是如果容量太小,取数据比存数据的速度慢,那么会造成过多的线程进入阻塞(也可以使用offer()方法达到不阻塞线程), 此外由于存取共用一把锁,所以有高并发和吞吐量的要求情况下,我们也不建议使用ArrayBlockingQueue。

使用场景:
在上述的情况下,笔者觉得它的使用场景更多应该放在项目的一些次级业务中,比如:

人事系统中员工离职/变更后,其他依赖应用进行数据同步。

在一些项目中,可能同公司的其他部门的应用服务会要求同步我们人事系统的部分组织架构数据,但是当人事系统数据发生变更后,应用的依赖方需要进行数据的同步, 这种场景下,由于员工离职/变更操作不是非常频繁,所以能有效防止线程阻塞,也基本没有并发和吞吐量的要求,所以可以将数据存放到ArrayBlockingQueue中, 由依赖方应用服务进行获取同步。

#LinkedBlockingQueue使用场景
特征:

LinkedBlockingQueue基于链表实现,队列容量默认Integer.MAX_VALUE
存/取数据的操作分别拥有独立的锁,可实现存/取并行执行。

分析:
1.基于链表,数据的新增和移除速度比数组快,但是每次存储/取出数据都会有Node对象的新建和移除,所以也存在由于GC影响性能的可能
2.默认容量非常大,所以存储数据的线程基本不会阻塞,但是如果消费速度过低,内存占用可能会飙升。
3.读/取操作锁分离,所以适合有并发和吞吐量要求的项目中

使用场景:
在项目的一些核心业务且生产和消费速度相似的场景中:

订单完成的邮件/短信提醒。

订单系统中当用户下单成功后,将信息放入ArrayBlockingQueue中,由消息推送系统取出数据进行消息推送提示用户下单成功。

如果订单的成交量非常大,那么使用ArrayBlockingQueue就会有一些问题,固定数组很容易被使用完,此时调用的线程会进入阻塞,那么可能无法及时将消息推送出去,所以使用LinkedBlockingQueue比较合适,但是要注意消费速度不能太低,不然很容易内存被使用完(一般而言不会时时刻刻生产消息, 但是需要预防消息大量堆积)

比较ArrayBlockingQueue:
实际上对于ArrayBlockingQueue和LinkedBlockingQueue在处理普通的生产者-消费者问题时,两者一般可互相替换使用。

这里也赘述下,有人可能会问为什么不用MQ,或者Redis 笔者认为:很多技术知识有相同的使用场景,是很常见的,使用MQ/Redis也好,阻塞队列也罢,我们需要考虑项目中采用哪种方案是最合适的的,如果我们有现成的MQ/Redis,且公司前辈对于功能的使用有一个很好的封装,或者业务要求必须使用MQ,那我们项目使用这些也没有问题,但是如果没有现成的MQ/Redis或者没有现成的使用封装,业务又相对单一,那我们用阻塞队列简单的写一个小功能去实现也是很不错的,当然如果你是为了学习这些中间件那就另当别论了。

#PriorityBlockingQueue使用场景
特征:
基于数组实现,队列容量最大为Integer.MAX_VALUE - 8(减8是因为数组的对象头)。

根据传入的优先级进行排序,保证按优先级来消费

分析
优先级阻塞队列中存在一次排序,根据优先级来将数据放入到头部或者尾部
排序带来的损耗因素,由二叉树最小堆排序算法来降低

使用场景:
在项目上存在优先级的业务

VIP排队购票(实现代码在文章末尾)

用户购票的时候,根据用户不同的等级,优先放到队伍的前面,当存在票源的时候,根据优先级分配

DelayQueue使用场景
特征:
DelayQueue延迟队列,基于优先级队列来实现
存储元素必须实现Delayed接口(Delayed接口继承了Comparable接口)

分析:
由于是基于优先级队列实现,但是它比较的是时间,我们可以根据需要去倒叙或者正序排列(一般都是倒叙,用于倒计时)

使用场景:

订单超时取消功能

用户下订单未支付开始倒计时,超时则释放订单中的资源,如果取消或者完成支付,我们再讲队列中的数据移除掉。

网站刷题倒计时 (实现代码在文章末尾)

逻辑同上

#SynchronousQueue使用场景
特征:
采用双栈双队列算法的无空间队列或栈
任何一个对SynchronousQueue写需要等到一个对SynchronousQueue的读操作,任何一个个读操作需要等待一个写操作
没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者。

分析:
相当于是交换通道,不存储任何元素,提供者和消费者是需要组队完成工作,缺少一个将会阻塞线程,直到等到配对为止

使用场景:

参考线程池newCachedThreadPool()。

如果我们不确定每一个来自生产者请求数量但是需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是最简洁的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程默认空闲了60秒后会被回收。

轻量级别的任务转交

比如会话转交,通常坐席需要进行会话转交,如果有坐席在线那么会为我们分配一个客服,但是如果没有,那么阻塞请求线程,一段时间后会超时或者提示当前坐席已满。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants