Skip to content

Commit 7f37bd7

Browse files
committed
[Function add]
1.Add the function of LinkedBlockingList.
1 parent 190f5dc commit 7f37bd7

File tree

3 files changed

+218
-9
lines changed

3 files changed

+218
-9
lines changed
Lines changed: 163 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,70 @@
11
# LinkedBlockingQueue
22
* 一个由链接节点支持的可选有界队列。
3-
* 内部维护了一个Node类
3+
* 内部维护了一个Node类,是一个单向链表。第一个元素为空(dummy node)。
44
>LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
55
```Java
66
static class Node<E> {
77
E item;
8-
98
/**
109
* One of:
1110
* - the real successor Node
1211
* - this Node, meaning the successor is head.next
1312
* - null, meaning there is no successor (this is the last node)
1413
*/
1514
Node<E> next;
16-
1715
Node(E x) { item = x; }
1816
}
1917
```
18+
19+
## 插入元素
20+
* put() LinkedBlockingQueue的阻塞插入方法,如果队列已满,则阻塞并等待。
21+
```Java
22+
/**
23+
* Inserts the specified element at the tail of this queue, waiting if
24+
* necessary for space to become available.
25+
*
26+
* @throws InterruptedException {@inheritDoc}
27+
* @throws NullPointerException {@inheritDoc}
28+
*/
29+
public void put(E e) throws InterruptedException {
30+
if (e == null) throw new NullPointerException();
31+
// Note: convention in all put/take/etc is to preset local var
32+
// holding count negative to indicate failure unless set.
33+
int c = -1;
34+
Node<E> node = new Node<E>(e);
35+
final ReentrantLock putLock = this.putLock; //写锁
36+
final AtomicInteger count = this.count;
37+
putLock.lockInterruptibly();
38+
try {
39+
/*
40+
* Note that count is used in wait guard even though it is
41+
* not protected by lock. This works because count can
42+
* only decrease at this point (all other puts are shut
43+
* out by lock), and we (or some other waiting put) are
44+
* signalled if it ever changes from capacity. Similarly
45+
* for all other uses of count in other wait guards.
46+
*/
47+
while (count.get() == capacity) {
48+
notFull.await(); //如果队列已满,阻塞。
49+
}
50+
enqueue(node);
51+
c = count.getAndIncrement();
52+
if (c + 1 < capacity)
53+
notFull.signal();
54+
} finally {
55+
putLock.unlock();
56+
}
57+
if (c == 0)
58+
signalNotEmpty();
59+
}
60+
```
61+
62+
* offer()实现Queue接口的方法,如果当前队列已满,直接返回false。
2063
```Java
2164
public boolean offer(E e) {
2265
if (e == null) throw new NullPointerException();
2366
final AtomicInteger count = this.count; //此处的count为AtomicInteger,维护了原子性
24-
if (count.get() == capacity)
67+
if (count.get() == capacity) //如果队列已满,直接返回
2568
return false;
2669
int c = -1;
2770
Node<E> node = new Node<E>(e);
@@ -30,7 +73,7 @@ static class Node<E> {
3073
try {
3174
if (count.get() < capacity) {
3275
enqueue(node);
33-
c = count.getAndIncrement();
76+
c = count.getAndIncrement();
3477
if (c + 1 < capacity)
3578
notFull.signal();
3679
}
@@ -41,24 +84,55 @@ static class Node<E> {
4184
signalNotEmpty();
4285
return c >= 0;
4386
}
44-
87+
```
88+
89+
* enqueue() 添加方法的核心
90+
```Java
4591
private void enqueue(Node<E> node) {
4692
// assert putLock.isHeldByCurrentThread();
4793
// assert last.next == null;
4894
last = last.next = node; //在链表的结尾,添加要插入的结点。
4995
}
5096
```
97+
98+
## 取出元素
99+
* take() 和put()相对应的方法,从队列中取出元素,如果队列为空则会阻塞。
100+
```Java
101+
public E take() throws InterruptedException {
102+
E x;
103+
int c = -1;
104+
final AtomicInteger count = this.count;
105+
final ReentrantLock takeLock = this.takeLock;
106+
takeLock.lockInterruptibly();
107+
try {
108+
while (count.get() == 0) { //判断当前队列为空,condition方法开始阻塞。
109+
notEmpty.await();
110+
}
111+
x = dequeue(); //调用统一的dequeue方法从队列中读取元素。
112+
c = count.getAndDecrement();
113+
if (c > 1)
114+
notEmpty.signal();
115+
} finally {
116+
takeLock.unlock();
117+
}
118+
if (c == capacity)
119+
signalNotFull();
120+
return x;
121+
}
122+
```
123+
124+
* poll() 从队列中获取元素,如果队列中为空则返回null。
51125
```Java
52126
public E poll() {
53127
final AtomicInteger count = this.count;
54-
if (count.get() == 0)
128+
if (count.get() == 0) //如果队列为空直接返回null, point 1
55129
return null;
56130
E x = null;
57131
int c = -1;
58132
final ReentrantLock takeLock = this.takeLock;
59133
takeLock.lock();
60134
try {
61-
if (count.get() > 0) {
135+
if (count.get() > 0) { //再次判断,如果在point 1 到当前点之间队列已经变成空,直接跳过读取阶段,返回空。
62136
x = dequeue();
63137
c = count.getAndDecrement();
64138
if (c > 1)
@@ -71,4 +145,84 @@ static class Node<E> {
71145
signalNotFull();
72146
return x;
73147
}
74-
```
148+
```
149+
* dequeue() 统一的从队列头读取元素的方法
150+
>Avoid create a new node, just set value to null.
151+
```Java
152+
/**
153+
* Removes a node from head of queue.
154+
*
155+
* @return the node
156+
*/
157+
private E dequeue() {
158+
// assert takeLock.isHeldByCurrentThread();
159+
// assert head.item == null;
160+
Node<E> h = head;
161+
Node<E> first = h.next;
162+
h.next = h; // help GC
163+
head = first;
164+
E x = first.item;
165+
first.item = null;
166+
return x;
167+
}
168+
```
169+
170+
## Test
171+
* 向队列中放置100个元素不断读取,发现读取的进程最终阻塞
172+
```Java
173+
public class LinkedBlockingQueueProducer implements Runnable {
174+
private LinkedBlockingQueue<Integer> q;
175+
private AtomicInteger ai = new AtomicInteger();
176+
public LinkedBlockingQueueProducer(LinkedBlockingQueue<Integer> q) {
177+
super();
178+
this.q = q;
179+
}
180+
@Override
181+
public void run() {
182+
for(int i = 0; i < 100; i++){
183+
try {
184+
q.put(ai.get());
185+
System.out.println("Producer: put " + ai.getAndIncrement() + " into queue...");
186+
Thread.sleep(10);
187+
} catch (InterruptedException e) {
188+
e.printStackTrace();
189+
}
190+
}
191+
}
192+
}
193+
```
194+
195+
```Java
196+
public class LinkedBlockingQueueConsumer implements Runnable {
197+
private LinkedBlockingQueue<Integer> q;
198+
public LinkedBlockingQueueConsumer(LinkedBlockingQueue<Integer> q) {
199+
super();
200+
this.q = q;
201+
}
202+
@Override
203+
public void run() {
204+
try {
205+
while(true){
206+
System.out.println("Consumer: take " + q.take() + " from queue...");
207+
Thread.sleep(10);
208+
}
209+
} catch (InterruptedException e) {
210+
e.printStackTrace();
211+
}
212+
}
213+
214+
public static void main(String[] args) throws InterruptedException {
215+
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
216+
new Thread(new LinkedBlockingQueueConsumer(queue)).start();
217+
new Thread(new LinkedBlockingQueueProducer(queue)).start();
218+
Thread.currentThread().join();
219+
System.out.println("Finish...");
220+
}
221+
}
222+
```
223+
* 结果
224+
>Producer: put 98 into queue...
225+
Consumer: take 98 from queue...
226+
Producer: put 99 into queue...
227+
Consumer: take 99 from queue...
228+
读取进程最终**阻塞**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
5+
public class LinkedBlockingQueueConsumer implements Runnable {
6+
private LinkedBlockingQueue<Integer> q;
7+
public LinkedBlockingQueueConsumer(LinkedBlockingQueue<Integer> q) {
8+
super();
9+
this.q = q;
10+
}
11+
@Override
12+
public void run() {
13+
try {
14+
while(true){
15+
System.out.println("Consumer: take " + q.take() + " from queue...");
16+
Thread.sleep(10);
17+
}
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
23+
public static void main(String[] args) throws InterruptedException {
24+
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
25+
new Thread(new LinkedBlockingQueueConsumer(queue)).start();
26+
new Thread(new LinkedBlockingQueueProducer(queue)).start();
27+
Thread.currentThread().join();
28+
System.out.println("Finish...");
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class LinkedBlockingQueueProducer implements Runnable {
7+
private LinkedBlockingQueue<Integer> q;
8+
private AtomicInteger ai = new AtomicInteger();
9+
public LinkedBlockingQueueProducer(LinkedBlockingQueue<Integer> q) {
10+
super();
11+
this.q = q;
12+
}
13+
@Override
14+
public void run() {
15+
for(int i = 0; i < 100; i++){
16+
try {
17+
q.put(ai.get());
18+
System.out.println("Producer: put " + ai.getAndIncrement() + " into queue...");
19+
Thread.sleep(10);
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
}
24+
}
25+
}

0 commit comments

Comments
 (0)