Skip to content

Commit 338fc88

Browse files
committed
[Function add]
1.Conclusion and implementation of delayqueue.
1 parent 9f65448 commit 338fc88

File tree

7 files changed

+430
-0
lines changed

7 files changed

+430
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.Delayed;
4+
import java.util.concurrent.TimeUnit;
5+
6+
public class DelayMsg<V> implements Delayed{
7+
private Long expire;
8+
private Long insertTime;
9+
private V v;
10+
11+
public V getV() {
12+
return v;
13+
}
14+
public Long getInsertTime() {
15+
return insertTime;
16+
}
17+
public DelayMsg(Long expire, Long insertTime, V v){
18+
this.expire = expire;
19+
this.insertTime = insertTime;
20+
this.v = v;
21+
}
22+
@Override
23+
public int compareTo(Delayed o) {
24+
@SuppressWarnings("rawtypes")
25+
Long delta = insertTime - ((DelayMsg)o).getInsertTime();
26+
if(delta < 0) return -1;
27+
else if(delta == 0) return 0;
28+
else
29+
return 1;
30+
}
31+
32+
/* (non-Javadoc)
33+
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
34+
* 用于定义延时策略
35+
*/
36+
@Override
37+
public long getDelay(TimeUnit unit) {
38+
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.text.DateFormat;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Date;
6+
import java.util.concurrent.DelayQueue;
7+
8+
public class DelayQueueConsumer implements Runnable {
9+
private DelayQueue<DelayMsg<Integer>> q;
10+
public DelayQueueConsumer(DelayQueue<DelayMsg<Integer>> q) {
11+
super();
12+
this.q = q;
13+
}
14+
@Override
15+
public void run() {
16+
DateFormat df = new SimpleDateFormat("HH:mm:ss");
17+
while (true) {
18+
try {
19+
System.out.println(df.format(new Date(System.currentTimeMillis())) + ": Consumer: take " + ((DelayMsg<Integer>)q.take()).getV() + " from queue...");
20+
Thread.sleep(1);
21+
} catch (InterruptedException e) {
22+
e.printStackTrace();
23+
}
24+
}
25+
}
26+
27+
public static void main(String[] args) {
28+
DelayQueue<DelayMsg<Integer>> q = new DelayQueue<>();
29+
new Thread(new DelayQueueConsumer(q)).start();
30+
new Thread(new DelayQueueProducer(q)).start();
31+
System.out.println("Main finish...");
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.text.DateFormat;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Date;
6+
import java.util.concurrent.DelayQueue;
7+
8+
public class DelayQueueProducer implements Runnable {
9+
private DelayQueue<DelayMsg<Integer>> q;
10+
11+
public DelayQueueProducer(DelayQueue<DelayMsg<Integer>> q) {
12+
super();
13+
this.q = q;
14+
}
15+
16+
@Override
17+
public void run() {
18+
DateFormat df = new SimpleDateFormat("HH:mm:ss");
19+
for(int i = 0; i < 10; i++){
20+
q.offer(new DelayMsg<Integer>(System.currentTimeMillis() + 10000, System.currentTimeMillis(), i));
21+
System.out.println(df.format(new Date(System.currentTimeMillis())) + ": Producer: put " + i + " into queue...");
22+
try {
23+
Thread.sleep(1);
24+
} catch (InterruptedException e) {
25+
e.printStackTrace();
26+
}
27+
}
28+
}
29+
}

Diff for: DataStructrue/Queue/DelayMsg.java

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.Delayed;
4+
import java.util.concurrent.TimeUnit;
5+
6+
public class DelayMsg<V> implements Delayed{
7+
private Long expire;
8+
private Long insertTime;
9+
private V v;
10+
11+
public V getV() {
12+
return v;
13+
}
14+
public Long getInsertTime() {
15+
return insertTime;
16+
}
17+
public DelayMsg(Long expire, Long insertTime, V v){
18+
this.expire = expire;
19+
this.insertTime = insertTime;
20+
this.v = v;
21+
}
22+
@Override
23+
public int compareTo(Delayed o) {
24+
@SuppressWarnings("rawtypes")
25+
Long delta = insertTime - ((DelayMsg)o).getInsertTime();
26+
if(delta < 0) return -1;
27+
else if(delta == 0) return 0;
28+
else
29+
return 1;
30+
}
31+
32+
/* (non-Javadoc)
33+
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
34+
* 用于定义延时策略
35+
*/
36+
@Override
37+
public long getDelay(TimeUnit unit) {
38+
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
39+
}
40+
}

Diff for: DataStructrue/Queue/DelayQueue.md

+226
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# DelayQueue
2+
>DelayQueue(**基于PriorityQueue来实现的**)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
3+
>DelayQueue中存储的是Delayed接口的实现
4+
5+
## Delayed
6+
```Java
7+
public interface Delayed extends Comparable<Delayed> {
8+
9+
/**
10+
* Returns the remaining delay associated with this object, in the
11+
* given time unit.
12+
*
13+
* @param unit the time unit
14+
* @return the remaining delay; zero or negative values indicate
15+
* that the delay has already elapsed
16+
*/
17+
long getDelay(TimeUnit unit);
18+
}
19+
```
20+
我们要实现getDelay()方法,返回还需要等待的时间。
21+
同时Delayed接口继承了Comparable方法,所以要实现compareTo方法(用于在优先级队列中排序)。
22+
23+
### 一个Delayed接口的实现类
24+
```Java
25+
public class DelayMsg<V> implements Delayed{
26+
private Long expire; //用于存储什么时候当前对象可以生效。
27+
private Long insertTime; //当前对象插入队列的时间,用于优先级排列,基于延迟的基础上的FIFO
28+
private V v;
29+
public V getV() {
30+
return v;
31+
}
32+
public Long getInsertTime() {
33+
return insertTime;
34+
}
35+
public DelayMsg(Long expire){
36+
this.expire = expire;
37+
}
38+
@Override
39+
public int compareTo(Delayed o) {
40+
Long delta = insertTime - ((DelayMsg)o).getInsertTime();
41+
if(delta > 0) return 1;
42+
else if(delta == 0) return 0;
43+
else
44+
return -1;
45+
}
46+
47+
/* (non-Javadoc)
48+
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
49+
* 用于定义延时策略,根据需求定义。
50+
*/
51+
@Override
52+
public long getDelay(TimeUnit unit) {
53+
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);//判断是否当前时间已经超过了生效时间
54+
}
55+
}
56+
```
57+
58+
## DelayQueue的API
59+
### 添加元素
60+
* 因为是无界队列,所以不存在插入元素时的阻塞。
61+
* put() 通过offer()方法实现。
62+
```Java
63+
public void put(E e) {
64+
offer(e);
65+
}
66+
```
67+
68+
* add() 通过offer()方法实现。
69+
```Java
70+
public boolean add(E e) {
71+
return offer(e);
72+
}
73+
```
74+
75+
* offer() 事实上offer方法调用了priorityQueue的插入元素方法
76+
```Java
77+
public boolean offer(E e) {
78+
final ReentrantLock lock = this.lock;
79+
lock.lock();
80+
try {
81+
q.offer(e);
82+
if (q.peek() == e) { //说明当前队列中只有一个元素,解除阻塞。
83+
leader = null;
84+
available.signal();
85+
}
86+
return true;
87+
} finally {
88+
lock.unlock();
89+
}
90+
}
91+
```
92+
93+
### 读取元素
94+
* poll() 返回根节点,如果根节点为空或根节点未完成其延时要求,返回null,非阻塞
95+
```Java
96+
public E poll() {
97+
final ReentrantLock lock = this.lock;
98+
lock.lock();
99+
try {
100+
E first = q.peek(); //取出根节点
101+
if (first == null || first.getDelay(NANOSECONDS) > 0)
102+
return null;
103+
else
104+
return q.poll();//完成延时的要求,从优先队列中取出根节点并返回。
105+
} finally {
106+
lock.unlock();
107+
}
108+
}
109+
```
110+
111+
* take()
112+
>leader-follower
113+
>所有线程会有三种身份中的一种:leader和 follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为 leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将 其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法 可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
114+
115+
```Java
116+
public E take() throws InterruptedException {
117+
final ReentrantLock lock = this.lock;
118+
lock.lockInterruptibly();
119+
try {
120+
for (;;) {
121+
E first = q.peek();
122+
if (first == null) //判断队列中是否有元素
123+
available.await();
124+
else {
125+
long delay = first.getDelay(NANOSECONDS);//如果有元素,则判断有没有完成延时要求
126+
if (delay <= 0)
127+
return q.poll(); //完成延时要求,返回元素
128+
first = null; // don't retain ref while waiting
129+
if (leader != null) //如果leader不为空,则继续进入等待
130+
available.await();
131+
else { //如果leader为空,则将当前处理的线程提升为leader并继续进入等待
132+
Thread thisThread = Thread.currentThread();
133+
leader = thisThread;
134+
try {
135+
available.awaitNanos(delay);
136+
} finally {
137+
if (leader == thisThread)
138+
leader = null; //完成所有操作后,将leader置空
139+
}
140+
}
141+
}
142+
}
143+
} finally {
144+
if (leader == null && q.peek() != null)
145+
available.signal();
146+
lock.unlock();
147+
}
148+
}
149+
```
150+
151+
### Test
152+
我们创建10个元素放入延迟队列中,设置延时10s,消费者通过while(true)不断读取。
153+
* DelayQueueProducer
154+
```Java
155+
public class DelayQueueProducer implements Runnable {
156+
private DelayQueue<DelayMsg<Integer>> q;
157+
public DelayQueueProducer(DelayQueue<DelayMsg<Integer>> q) {
158+
super();
159+
this.q = q;
160+
}
161+
@Override
162+
public void run() {
163+
DateFormat df = new SimpleDateFormat("HH:mm:ss");
164+
for(int i = 0; i < 10; i++){ //创建10个元素放入延迟队列
165+
q.offer(new DelayMsg<Integer>(System.currentTimeMillis() + 10000, System.currentTimeMillis(), i));
166+
System.out.println(df.format(new Date(System.currentTimeMillis())) + ": Producer: put " + i + " into queue...");
167+
try {
168+
Thread.sleep(1);
169+
} catch (InterruptedException e) {
170+
e.printStackTrace();
171+
}
172+
}
173+
}
174+
}
175+
```
176+
* DelayQueueConsumer一直循环读取元素
177+
```Java
178+
public class DelayQueueConsumer implements Runnable {
179+
private DelayQueue<DelayMsg<Integer>> q;
180+
public DelayQueueConsumer(DelayQueue<DelayMsg<Integer>> q) {
181+
super();
182+
this.q = q;
183+
}
184+
@Override
185+
public void run() {
186+
DateFormat df = new SimpleDateFormat("HH:mm:ss");
187+
while (true) {
188+
try {
189+
System.out.println(df.format(new Date(System.currentTimeMillis())) + ": Consumer: take " + ((DelayMsg<Integer>)q.take()).getV() + " from queue...");
190+
Thread.sleep(1);
191+
} catch (InterruptedException e) {
192+
e.printStackTrace();
193+
}
194+
}
195+
}
196+
public static void main(String[] args) {
197+
DelayQueue<DelayMsg<Integer>> q = new DelayQueue<>();
198+
new Thread(new DelayQueueConsumer(q)).start();
199+
new Thread(new DelayQueueProducer(q)).start();
200+
System.out.println("Main finish...");
201+
}
202+
}
203+
```
204+
结果
205+
Main finish...
206+
15:54:00: Producer: put 0 into queue...
207+
15:54:00: Producer: put 1 into queue...
208+
15:54:00: Producer: put 2 into queue...
209+
15:54:00: Producer: put 3 into queue...
210+
15:54:00: Producer: put 4 into queue...
211+
15:54:00: Producer: put 5 into queue...
212+
15:54:00: Producer: put 6 into queue...
213+
15:54:00: Producer: put 7 into queue...
214+
15:54:00: Producer: put 8 into queue...
215+
15:54:00: Producer: put 9 into queue...
216+
15:54:00: Consumer: take 0 from queue...
217+
15:54:10: Consumer: take 1 from queue...
218+
15:54:10: Consumer: take 2 from queue...
219+
15:54:10: Consumer: take 3 from queue...
220+
15:54:10: Consumer: take 4 from queue...
221+
15:54:10: Consumer: take 5 from queue...
222+
15:54:10: Consumer: take 6 from queue...
223+
15:54:10: Consumer: take 7 from queue...
224+
15:54:10: Consumer: take 8 from queue...
225+
15:54:10: Consumer: take 9 from queue...
226+
>延迟10s才从队列中取出元素,之后因为队列为空开始阻塞。

0 commit comments

Comments
 (0)