Skip to content

Commit 8c4f13b

Browse files
committed
[Function add]
1.Implementation of Concurrent Linked Queue.
1 parent 9d648f6 commit 8c4f13b

File tree

5 files changed

+182
-1
lines changed

5 files changed

+182
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ConcurrentLinkedQueue;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class ConcurrentLinkQueueProducer implements Runnable{
7+
private ConcurrentLinkedQueue<Integer> q;
8+
public ConcurrentLinkQueueProducer(ConcurrentLinkedQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
AtomicInteger al = new AtomicInteger(0);
15+
while(true){
16+
q.offer(al.get());
17+
System.out.println("Producer: put " + al.getAndIncrement() + " into queue...");
18+
try {
19+
Thread.sleep(10);
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
}
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ConcurrentLinkedQueue;
4+
5+
public class ConcurrentLinkedQueueConsumer implements Runnable {
6+
private ConcurrentLinkedQueue<Integer> q;
7+
public ConcurrentLinkedQueueConsumer(ConcurrentLinkedQueue<Integer> q) {
8+
super();
9+
this.q = q;
10+
}
11+
@Override
12+
public void run() {
13+
try {
14+
while(true){
15+
System.out.println("Consumer: get " + q.poll() + " from queue...");
16+
Thread.sleep(10);
17+
}
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
23+
public static void main(String[] args) throws InterruptedException {
24+
ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
25+
Thread producer = new Thread(new ConcurrentLinkQueueProducer(q));
26+
producer.start();
27+
producer.join(100);
28+
new Thread(new ConcurrentLinkedQueueConsumer(q)).start();
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ConcurrentLinkedQueue;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class ConcurrentLinkQueueProducer implements Runnable{
7+
private ConcurrentLinkedQueue<Integer> q;
8+
public ConcurrentLinkQueueProducer(ConcurrentLinkedQueue<Integer> q) {
9+
super();
10+
this.q = q;
11+
}
12+
@Override
13+
public void run() {
14+
AtomicInteger al = new AtomicInteger(0);
15+
while(true){
16+
q.offer(al.get());
17+
System.out.println("Producer: put " + al.getAndIncrement() + " into queue...");
18+
try {
19+
Thread.sleep(10);
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
}
24+
}
25+
}

DataStructrue/Queue/ConcurrentLinkedQueue.md

+72-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public final boolean compareAndSet(long expect, long update) {
112112

113113
### 出队列
114114
![cdl](https://i.imgur.com/G5Irgel.jpg)
115-
* poll()
115+
* poll() 出队列不会阻塞,如果队列为空则返回null。
116116
```Java
117117
public E poll() {
118118
restartFromHead:
@@ -142,4 +142,75 @@ public final boolean compareAndSet(long expect, long update) {
142142
}
143143
}
144144
}
145+
```
146+
147+
### 其他方法
148+
* size() 检查队列大小,O(n),为了保证并发性不得不每次都遍历链表。
149+
```Java
150+
public int size() { //返回检查队列的大小
151+
int count = 0;
152+
for (Node<E> p = first(); p != null; p = succ(p))
153+
if (p.item != null)
154+
// Collection.size() spec says to max out
155+
if (++count == Integer.MAX_VALUE)
156+
break;
157+
return count;
158+
}
159+
```
160+
161+
* isEmpty() 判断链表非空,O(1)级别
162+
163+
### Test
164+
* 生产者
165+
```Java
166+
public class ConcurrentLinkQueueProducer implements Runnable{
167+
private ConcurrentLinkedQueue<Integer> q;
168+
public ConcurrentLinkQueueProducer(ConcurrentLinkedQueue<Integer> q) {
169+
super();
170+
this.q = q;
171+
}
172+
@Override
173+
public void run() {
174+
AtomicInteger al = new AtomicInteger(0);//通过AtomicInteger保证原子性,避免锁。
175+
while(true){
176+
q.offer(al.get());
177+
System.out.println("Producer: put " + al.getAndIncrement() + " into queue...");
178+
try {
179+
Thread.sleep(10);
180+
} catch (InterruptedException e) {
181+
e.printStackTrace();
182+
}
183+
}
184+
}
185+
}
186+
```
187+
188+
* 消费者
189+
```Java
190+
public class ConcurrentLinkedQueueConsumer implements Runnable {
191+
private ConcurrentLinkedQueue<Integer> q;
192+
public ConcurrentLinkedQueueConsumer(ConcurrentLinkedQueue<Integer> q) {
193+
super();
194+
this.q = q;
195+
}
196+
@Override
197+
public void run() {
198+
try {
199+
while(true){
200+
System.out.println("Consumer: get " + q.poll() + " from queue...");
201+
Thread.sleep(10);
202+
}
203+
} catch (InterruptedException e) {
204+
e.printStackTrace();
205+
}
206+
}
207+
208+
public static void main(String[] args) throws InterruptedException {
209+
ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
210+
Thread producer = new Thread(new ConcurrentLinkQueueProducer(q));
211+
producer.start();
212+
producer.join(100);
213+
new Thread(new ConcurrentLinkedQueueConsumer(q)).start();
214+
}
215+
}
145216
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package ca.mcmaster.queue.arrayblockingqueue;
2+
3+
import java.util.concurrent.ConcurrentLinkedQueue;
4+
5+
public class ConcurrentLinkedQueueConsumer implements Runnable {
6+
private ConcurrentLinkedQueue<Integer> q;
7+
public ConcurrentLinkedQueueConsumer(ConcurrentLinkedQueue<Integer> q) {
8+
super();
9+
this.q = q;
10+
}
11+
@Override
12+
public void run() {
13+
try {
14+
while(true){
15+
System.out.println("Consumer: get " + q.poll() + " from queue...");
16+
Thread.sleep(10);
17+
}
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
}
22+
23+
public static void main(String[] args) throws InterruptedException {
24+
ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
25+
Thread producer = new Thread(new ConcurrentLinkQueueProducer(q));
26+
producer.start();
27+
producer.join(100);
28+
new Thread(new ConcurrentLinkedQueueConsumer(q)).start();
29+
}
30+
}

0 commit comments

Comments
 (0)