- 一个由优先级堆支持的无界优先级队列。内部是通过数组实现的。
- 通过完全二叉堆实现
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
插入元素有三个函数put, take和offer,事实上三个方法均是调用了offer方法。 插入元素不会阻塞,因为二叉堆是无界的。
- add()
public boolean add(E e) {
return offer(e);
}
- put()
public void put(E e) {
offer(e); // never need to block
}
- offer() 将要插入的元素放在数组的末尾,并通过swim方法使堆有序。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array); //根据comparable或者comparator进行堆排序
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
- take() 阻塞读取方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
- poll() 非阻塞方法,如果当前队列为空,直接返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
- dequeue() 通用的从二叉堆中读取根节点方法,通过将最小的元素替换根节点并进行sink维护堆的有序性。
/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
- PriorityBlockingQueueProducer
public class PriorityBlockingQueueProducer implements Runnable {
private PriorityBlockingQueue<Integer> q;
public PriorityBlockingQueueProducer(PriorityBlockingQueue<Integer> q) {
super();
this.q = q;
}
@Override
public void run() {
while(true){
Random random = new Random();
int nextInt = random.nextInt(100);
q.put(nextInt);
System.out.println("Producer: put " + nextInt + " into queue...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- PriorityBlockingQueueConsumer
public class PriorityBlockingQueueConsumer implements Runnable {
private PriorityBlockingQueue<Integer> q;
public PriorityBlockingQueueConsumer(PriorityBlockingQueue<Integer> q) {
super();
this.q = q;
}
@Override
public void run() {
while(true){
try {
System.out.println("Consumer: take " + q.take() + " from queue...");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Integer> q = new PriorityBlockingQueue<>();
Thread producer = new Thread(new PriorityBlockingQueueProducer(q));
producer.start();
producer.join(200);
new Thread(new PriorityBlockingQueueConsumer(q)).start();
}
}