Skip to content

Commit 04188fa

Browse files
committed
[Function add]
1.Complete the conclusion of arrayblockingqueue.
1 parent fa8e432 commit 04188fa

File tree

3 files changed

+132
-3
lines changed

3 files changed

+132
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ArrayBlockingQueue;
4+
5+
public class ArrayBlockingQueueConsumer implements Runnable {
6+
private ArrayBlockingQueue<Integer> queue;
7+
public ArrayBlockingQueueConsumer(ArrayBlockingQueue<Integer> queue) {
8+
super();
9+
this.queue = queue;
10+
}
11+
@Override
12+
public void run() {
13+
try {
14+
while(true){
15+
System.out.println("consumer...");
16+
// Thread.currentThread().join(10);
17+
// System.out.println("Consumer: get " + queue.take() + " from queue...");
18+
Thread.sleep(10);
19+
}
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
}
24+
25+
public static void main(String[] args) throws InterruptedException {
26+
// ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000);
27+
new Thread().run();
28+
new Thread(()->{System.out.println("producer...");}).run();
29+
// Thread.currentThread().join();
30+
System.out.println("Finish...");
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ArrayBlockingQueue;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class ArrayBlockingQueueProducer implements Runnable {
7+
private ArrayBlockingQueue<Integer> queue;
8+
public ArrayBlockingQueueProducer(ArrayBlockingQueue<Integer> queue) {
9+
super();
10+
this.queue = queue;
11+
}
12+
private volatile AtomicInteger ai = new AtomicInteger(0);
13+
@Override
14+
public void run() {
15+
while(true){
16+
try {
17+
System.out.println("producer...");
18+
Thread.currentThread().join(10);
19+
// queue.put(ai.getAndIncrement());
20+
// System.out.println("Producer: put " + ai.get() + " into queue...");
21+
Thread.sleep(10000);
22+
} catch (InterruptedException e) {
23+
e.printStackTrace();
24+
}
25+
}
26+
}
27+
}

DataStructrue/Queue/ArrayBlockingQueue.md

+73-3
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,42 @@
33
* 如果到达了上界,将无法添加新的元素进入。
44
* FIFO
55
>ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
6+
7+
## ArrayBlockingQueue的操作
8+
* 阻塞方法: put() <---> take()
9+
* 非阻塞方法: offer() <---> poll()
10+
11+
### 添加元素
12+
* put() put是ArrayBlockingQueue的方法(不是从Queue接口中继承来),在该方法中获取全局锁,如果队列满,将会阻塞直到有空间可以插入元素。条件(队列空,满)是通过**Condition**接口实现的。
13+
```Java
14+
/**
15+
* Inserts the specified element at the tail of this queue, waiting
16+
* for space to become available if the queue is full.
17+
*
18+
* @throws InterruptedException {@inheritDoc}
19+
* @throws NullPointerException {@inheritDoc}
20+
*/
21+
public void put(E e) throws InterruptedException {
22+
checkNotNull(e);
23+
final ReentrantLock lock = this.lock; //获取全局的锁
24+
lock.lockInterruptibly();
25+
try {
26+
while (count == items.length)
27+
notFull.await(); //如果当前的队列满了,则一直阻塞
28+
enqueue(e); //调用全局的私有入队列方法
29+
} finally {
30+
lock.unlock();
31+
}
32+
}
33+
```
34+
* offer() offer是Queue接口要求实现的方法,如果队列仍有位置允许插入,插入元素,如果队列已满,直接返回false,不会阻塞。
635
```Java
736
public boolean offer(E e) {
837
checkNotNull(e);
938
final ReentrantLock lock = this.lock;
1039
lock.lock(); //在写入的过程中获取锁
1140
try {
12-
if (count == items.length)
41+
if (count == items.length) //如果queue中的元素已经到达了上限,直接返回false
1342
return false;
1443
else {
1544
enqueue(e); //调用私有的enqueue方法
@@ -20,6 +49,16 @@
2049
}
2150
}
2251
```
52+
* add() add是Queue接口要求实现的方法,内部调用了offer(),如果队列仍有位置允许插入,插入元素,如果队列已满,抛出异常,不会阻塞。
53+
```Java
54+
public boolean add(E e) {
55+
if (offer(e))
56+
return true;
57+
else
58+
throw new IllegalStateException("Queue full");
59+
}
60+
```
61+
* enqueue() enqueue是插入队列的核心方法,维护了一个读指针,一个写指针。
2362
```Java
2463
/**
2564
* Inserts element at current put position, advances, and signals.
@@ -33,9 +72,25 @@
3372
if (++putIndex == items.length)
3473
putIndex = 0;
3574
count++;
36-
notEmpty.signal(); //取消notEmpty的await.
75+
notEmpty.signal(); //取消notEmpty的await(put方法).
76+
}
77+
```
78+
## 读取
79+
* take() take和put对应,是ArrayBlockingQueue私有的阻塞方法,在读取的过程中,如果发现队列为空,则会阻塞直到有元素可以读取。
80+
```Java
81+
public E take() throws InterruptedException {
82+
final ReentrantLock lock = this.lock;
83+
lock.lockInterruptibly();
84+
try {
85+
while (count == 0)
86+
notEmpty.await();
87+
return dequeue();
88+
} finally {
89+
lock.unlock();
90+
}
3791
}
3892
```
93+
* poll() 和offer()对应,如果没有元素,不会阻塞,返回false
3994
```Java
4095
public E poll() {
4196
final ReentrantLock lock = this.lock;
@@ -47,6 +102,8 @@
47102
}
48103
}
49104
```
105+
106+
* dequeue()
50107
```Java
51108
private E dequeue() {
52109
// assert lock.getHoldCount() == 1;
@@ -55,12 +112,25 @@
55112
@SuppressWarnings("unchecked")
56113
E x = (E) items[takeIndex];
57114
items[takeIndex] = null;
58-
if (++takeIndex == items.length)
115+
if (++takeIndex == items.length) //循环读取数组
59116
takeIndex = 0;
60117
count--;
61118
if (itrs != null)
62119
itrs.elementDequeued();
63120
notFull.signal();
64121
return x;
65122
}
123+
```
124+
125+
* peek() 并不会删除队列的第一个元素,单纯的读取值。调用的是itemAt方法
126+
```Java
127+
public E peek() {
128+
final ReentrantLock lock = this.lock;
129+
lock.lock();
130+
try {
131+
return itemAt(takeIndex); // null when queue is empty
132+
} finally {
133+
lock.unlock();
134+
}
135+
}
66136
```

0 commit comments

Comments
 (0)