Skip to content

Commit 9f65448

Browse files
committed
[Function add]
1.Add the conclusion and implementation of priority blcoking queue.
1 parent 7f37bd7 commit 9f65448

File tree

7 files changed

+298
-3
lines changed

7 files changed

+298
-3
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
5+
public class LinkedBlockingQueueConsumer implements Runnable {
6+
private LinkedBlockingQueue<Integer> q;
7+
public LinkedBlockingQueueConsumer(LinkedBlockingQueue<Integer> q) {
8+
super();
9+
this.q = q;
10+
}
11+
@Override
12+
public void run() {
13+
try {
14+
while(true){
15+
System.out.println("Consumer: take " + q.take() + " from queue...");
16+
Thread.sleep(10);
17+
}
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
23+
public static void main(String[] args) throws InterruptedException {
24+
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
25+
new Thread(new LinkedBlockingQueueConsumer(queue)).start();
26+
new Thread(new LinkedBlockingQueueProducer(queue)).start();
27+
Thread.currentThread().join();
28+
System.out.println("Finish...");
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class LinkedBlockingQueueProducer implements Runnable {
7+
private LinkedBlockingQueue<Integer> q;
8+
private AtomicInteger ai = new AtomicInteger();
9+
public LinkedBlockingQueueProducer(LinkedBlockingQueue<Integer> q) {
10+
super();
11+
this.q = q;
12+
}
13+
@Override
14+
public void run() {
15+
for(int i = 0; i < 100; i++){
16+
try {
17+
q.put(ai.get());
18+
System.out.println("Producer: put " + ai.getAndIncrement() + " into queue...");
19+
Thread.sleep(10);
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
}
24+
}
25+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.PriorityBlockingQueue;
4+
5+
public class PriorityBlockingQueueConsumer implements Runnable {
6+
7+
private PriorityBlockingQueue<Integer> q;
8+
public PriorityBlockingQueueConsumer(PriorityBlockingQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
while(true){
15+
try {
16+
System.out.println("Consumer: take " + q.take() + " from queue...");
17+
Thread.sleep(10);
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
}
23+
24+
public static void main(String[] args) throws InterruptedException {
25+
PriorityBlockingQueue<Integer> q = new PriorityBlockingQueue<>();
26+
Thread producer = new Thread(new PriorityBlockingQueueProducer(q));
27+
producer.start();
28+
producer.join(200);
29+
new Thread(new PriorityBlockingQueueConsumer(q)).start();
30+
}
31+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.PriorityBlockingQueue;
5+
6+
public class PriorityBlockingQueueProducer implements Runnable {
7+
private PriorityBlockingQueue<Integer> q;
8+
public PriorityBlockingQueueProducer(PriorityBlockingQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
while(true){
15+
Random random = new Random();
16+
int nextInt = random.nextInt(100);
17+
q.put(nextInt);
18+
System.out.println("Producer: put " + nextInt + " into queue...");
19+
try {
20+
Thread.sleep(10);
21+
} catch (InterruptedException e) {
22+
e.printStackTrace();
23+
}
24+
}
25+
}
26+
}

DataStructrue/Queue/PriorityBlockingQueue.md

Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,27 @@
11
# PriorityBlockingQueue
2-
* 一个由优先级堆支持的无界优先级队列。
2+
* 一个由优先级堆支持的无界优先级队列。内部是通过数组实现的。
3+
* 通过[完全二叉堆](https://github.com/Seanforfun/Algorithm/blob/master/DataStructrue/Tree/%E5%AE%8C%E5%85%A8%E4%BA%8C%E5%8F%89%E6%A0%91CompleteBinaryTree.md)实现
34
>PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
5+
6+
## 优先队列的操作
7+
### 插入元素
8+
>插入元素有三个函数put, take和offer,事实上三个方法均是调用了offer方法。
9+
>插入元素不会阻塞,因为二叉堆是无界的。
10+
* add()
11+
```Java
12+
public boolean add(E e) {
13+
return offer(e);
14+
}
15+
```
16+
17+
* put()
18+
```Java
19+
public void put(E e) {
20+
offer(e); // never need to block
21+
}
22+
```
23+
24+
* offer() 将要插入的元素放在数组的末尾,并通过swim方法使堆有序。
425
```Java
526
public boolean offer(E e) {
627
if (e == null)
@@ -14,7 +35,7 @@
1435
try {
1536
Comparator<? super E> cmp = comparator;
1637
if (cmp == null)
17-
siftUpComparable(n, e, array);
38+
siftUpComparable(n, e, array); //根据comparable或者comparator进行堆排序
1839
else
1940
siftUpUsingComparator(n, e, array, cmp);
2041
size = n + 1;
@@ -26,11 +47,116 @@
2647
}
2748
```
2849

50+
### 读取元素
51+
* take() 阻塞读取方法
52+
```Java
53+
public E take() throws InterruptedException {
54+
final ReentrantLock lock = this.lock;
55+
lock.lockInterruptibly();
56+
E result;
57+
try {
58+
while ( (result = dequeue()) == null)
59+
notEmpty.await();
60+
} finally {
61+
lock.unlock();
62+
}
63+
return result;
64+
}
65+
```
2966

67+
* poll() 非阻塞方法,如果当前队列为空,直接返回null
68+
```Java
69+
public E poll() {
70+
final ReentrantLock lock = this.lock;
71+
lock.lock();
72+
try {
73+
return dequeue();
74+
} finally {
75+
lock.unlock();
76+
}
77+
}
78+
```
3079

80+
* dequeue() 通用的从二叉堆中读取根节点方法,通过将最小的元素替换根节点并进行sink维护堆的有序性。
81+
```Java
82+
/**
83+
* Mechanics for poll(). Call only while holding lock.
84+
*/
85+
private E dequeue() {
86+
int n = size - 1;
87+
if (n < 0)
88+
return null;
89+
else {
90+
Object[] array = queue;
91+
E result = (E) array[0];
92+
E x = (E) array[n];
93+
array[n] = null;
94+
Comparator<? super E> cmp = comparator;
95+
if (cmp == null)
96+
siftDownComparable(0, x, array, n);
97+
else
98+
siftDownUsingComparator(0, x, array, n, cmp);
99+
size = n;
100+
return result;
101+
}
102+
}
103+
```
31104

105+
### Test
106+
* PriorityBlockingQueueProducer
107+
```Java
108+
public class PriorityBlockingQueueProducer implements Runnable {
109+
private PriorityBlockingQueue<Integer> q;
110+
public PriorityBlockingQueueProducer(PriorityBlockingQueue<Integer> q) {
111+
super();
112+
this.q = q;
113+
}
114+
@Override
115+
public void run() {
116+
while(true){
117+
Random random = new Random();
118+
int nextInt = random.nextInt(100);
119+
q.put(nextInt);
120+
System.out.println("Producer: put " + nextInt + " into queue...");
121+
try {
122+
Thread.sleep(10);
123+
} catch (InterruptedException e) {
124+
e.printStackTrace();
125+
}
126+
}
127+
}
128+
}
129+
```
32130

33-
131+
* PriorityBlockingQueueConsumer
132+
```Java
133+
public class PriorityBlockingQueueConsumer implements Runnable {
134+
135+
private PriorityBlockingQueue<Integer> q;
136+
public PriorityBlockingQueueConsumer(PriorityBlockingQueue<Integer> q) {
137+
super();
138+
this.q = q;
139+
}
140+
@Override
141+
public void run() {
142+
while(true){
143+
try {
144+
System.out.println("Consumer: take " + q.take() + " from queue...");
145+
Thread.sleep(10);
146+
} catch (InterruptedException e) {
147+
e.printStackTrace();
148+
}
149+
}
150+
}
151+
public static void main(String[] args) throws InterruptedException {
152+
PriorityBlockingQueue<Integer> q = new PriorityBlockingQueue<>();
153+
Thread producer = new Thread(new PriorityBlockingQueueProducer(q));
154+
producer.start();
155+
producer.join(200);
156+
new Thread(new PriorityBlockingQueueConsumer(q)).start();
157+
}
158+
}
159+
```
34160

35161

36162

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.PriorityBlockingQueue;
4+
5+
public class PriorityBlockingQueueConsumer implements Runnable {
6+
7+
private PriorityBlockingQueue<Integer> q;
8+
public PriorityBlockingQueueConsumer(PriorityBlockingQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
while(true){
15+
try {
16+
System.out.println("Consumer: take " + q.take() + " from queue...");
17+
Thread.sleep(10);
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
}
23+
24+
public static void main(String[] args) throws InterruptedException {
25+
PriorityBlockingQueue<Integer> q = new PriorityBlockingQueue<>();
26+
Thread producer = new Thread(new PriorityBlockingQueueProducer(q));
27+
producer.start();
28+
producer.join(200);
29+
new Thread(new PriorityBlockingQueueConsumer(q)).start();
30+
}
31+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.PriorityBlockingQueue;
5+
6+
public class PriorityBlockingQueueProducer implements Runnable {
7+
private PriorityBlockingQueue<Integer> q;
8+
public PriorityBlockingQueueProducer(PriorityBlockingQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
while(true){
15+
Random random = new Random();
16+
int nextInt = random.nextInt(100);
17+
q.put(nextInt);
18+
System.out.println("Producer: put " + nextInt + " into queue...");
19+
try {
20+
Thread.sleep(10);
21+
} catch (InterruptedException e) {
22+
e.printStackTrace();
23+
}
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)