Skip to content

Commit 3be3497

Browse files
committed
add CountDownLatch & CyclicBarrier & Semaphore
1 parent ce4489e commit 3be3497

File tree

8 files changed

+367
-0
lines changed

8 files changed

+367
-0
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,34 @@
11
# java-concurrent-programming-explain
22

3+
## Thread.join()
4+
5+
- 子线程执行完成才执行主线程
6+
- [ThreadJoinTest](module1/src/edu/maskleo/module1/ThreadJoinTest.java)
7+
8+
9+
## CountDownLatch
10+
11+
- 所有子线程完成了阻塞 `count` 计数之后就开始调用主线程的 `latch.await()` 之后的方法
12+
- [CountDownLatchTest](module1/src/edu/maskleo/module1/CyclicBarrierTest.java)
13+
14+
## CyclicBarrier
15+
16+
- 子线程完成等待之后,调用指定的线程执行,[示例](module1/src/edu/maskleo/module1/CyclicBarrierTest.java),一个类完成上述功能,[相关示例](module1/src/edu/maskleo/module1/CyclicBarrierTest2.java).
17+
18+
## CountDownLatch & CyclicBarrier
19+
20+
- `CountDownLatch``CyclicBarrier` 都能够实现线程之间的等待,只不过它们侧重点不同:
21+
- `CountDownLatch` 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
22+
23+
-`CyclicBarrier` 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
24+
25+
- 另外,`CountDownLatch` 是不能够重用的,而 `CyclicBarrier` 是可以重用的。
26+
27+
## Semaphore
28+
29+
- Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。
30+
- 初始化时设置大小, 线程每次获取和存放回去都有数量可选。
31+
332
## 資料
433

534
- [The j.u.c Synchronizer Framework中文翻译版](http://ifeve.com/aqs/)
@@ -10,6 +39,7 @@
1039

1140
- [AQS解析(2)](https://ryan-hou.github.io/2018/06/13/AQS%E8%A7%A3%E6%9E%90-2/)
1241

42+
- [CountDownLatch、CyclicBarrier和 Semaphore](http://www.importnew.com/21889.html)
1343

1444
## LICENSE
1545

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.CountDownLatch;
5+
6+
public class CountDownLatchTest {
7+
8+
public static void main(String[] args) {
9+
CountDownLatch latch = new CountDownLatch(4);
10+
Thread t1 = new Thread(new InnerThread(latch),"t1");
11+
Thread t2 = new Thread(new InnerThread(latch),"t2");
12+
Thread t3 = new Thread(new InnerThread(latch),"t3");
13+
Thread t4 = new Thread(new InnerThread(latch),"t4");
14+
t1.start();
15+
t2.start();
16+
t3.start();
17+
t4.start();
18+
try {
19+
latch.await();
20+
}catch (Exception e){
21+
;
22+
}
23+
System.out.println("all finish!");
24+
}
25+
26+
static class InnerThread implements Runnable{
27+
28+
private CountDownLatch latch;
29+
30+
public InnerThread(CountDownLatch latch){
31+
this.latch = latch;
32+
}
33+
34+
@Override
35+
public void run() {
36+
System.out.println(Thread.currentThread().getName() + "begin to do something !");
37+
try {
38+
Thread.sleep(new Random().nextInt(1000));
39+
}catch (Exception e){
40+
;
41+
}
42+
System.out.println(Thread.currentThread().getName() + "finish !");
43+
latch.countDown();
44+
}
45+
}
46+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.concurrent.CyclicBarrier;
4+
5+
public class CyclicBarrierTest {
6+
7+
public static void main(String[] args) {
8+
CyclicBarrier barrier = new CyclicBarrier(4, () -> System.out.println("xxxxx"));
9+
Thread t1 = new Thread(new InnerThread(barrier), "t1");
10+
Thread t2 = new Thread(new InnerThread(barrier), "t2");
11+
Thread t3 = new Thread(new InnerThread(barrier), "t3");
12+
Thread t4 = new Thread(new InnerThread(barrier), "t4");
13+
t1.start();
14+
t2.start();
15+
t3.start();
16+
t4.start();
17+
System.out.println("all finish!");
18+
}
19+
20+
static class InnerThread implements Runnable{
21+
22+
private CyclicBarrier barrier;
23+
24+
public InnerThread(CyclicBarrier barrier){
25+
this.barrier = barrier;
26+
}
27+
28+
@Override
29+
public void run() {
30+
System.out.println(Thread.currentThread().getName() + " begin to do something !");
31+
try {
32+
barrier.await();
33+
}catch (Exception e){
34+
;
35+
}
36+
System.out.println(Thread.currentThread().getName() + " finish !");
37+
}
38+
}
39+
40+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.*;
5+
6+
public class CyclicBarrierTest2 implements Runnable {
7+
/**
8+
* 创建4个屏障,处理完之后执行当前类的run方法
9+
*/
10+
private CyclicBarrier c = new CyclicBarrier(4, this);
11+
/**
12+
* 假设只有4个sheet,所以只启动4个线程
13+
*/
14+
private Executor executor = Executors.newFixedThreadPool(4);
15+
/**
16+
* 保存每个sheet计算出的银流结果
17+
*/
18+
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new
19+
ConcurrentHashMap<>();
20+
21+
public static void main(String[] args) {
22+
CyclicBarrierTest2 bankWaterCount = new CyclicBarrierTest2();
23+
bankWaterCount.count();
24+
}
25+
26+
private void count() {
27+
for (int i = 0; i < 4; i++) {
28+
executor.execute(() -> {
29+
// 计算当前sheet的银流数据,计算代码省略
30+
sheetBankWaterCount
31+
.put(Thread.currentThread().getName(), 1);
32+
// 银流计算完成,插入一个屏障
33+
try {
34+
c.await();
35+
} catch (InterruptedException |
36+
BrokenBarrierException e) {
37+
e.printStackTrace();
38+
}
39+
});
40+
}
41+
}
42+
43+
@Override
44+
public void run() {
45+
int result = 0;
46+
// 汇总每个sheet计算出的结果
47+
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
48+
result += sheet.getValue();
49+
}
50+
// 将结果输出
51+
sheetBankWaterCount.put("result", result);
52+
System.out.println(result);
53+
}
54+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.concurrent.BrokenBarrierException;
4+
import java.util.concurrent.CyclicBarrier;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.TimeoutException;
7+
8+
public class CyclicBarrierTest3 {
9+
10+
public static void main(String[] args) {
11+
int N = 4;
12+
CyclicBarrier barrier = new CyclicBarrier(N);
13+
14+
for (int i = 0; i < N; i++) {
15+
if (i < N - 1)
16+
new Thread(new Writer(barrier), "t" + i).start();
17+
else {
18+
try {
19+
Thread.sleep(5000);
20+
} catch (InterruptedException e) {
21+
e.printStackTrace();
22+
}
23+
new Thread(new Writer(barrier), "t" + i).start();
24+
}
25+
}
26+
}
27+
28+
static class Writer implements Runnable {
29+
private CyclicBarrier cyclicBarrier;
30+
31+
public Writer(CyclicBarrier cyclicBarrier) {
32+
this.cyclicBarrier = cyclicBarrier;
33+
}
34+
35+
@Override
36+
public void run() {
37+
System.out.println("线程" + Thread.currentThread().getName() + " 正在写入数据...");
38+
try {
39+
Thread.sleep(5000); //以睡眠来模拟写入数据操作
40+
System.out.println("线程" + Thread.currentThread().getName() + " 写入数据完毕,等待其他线程写入完毕");
41+
try {
42+
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
43+
} catch (TimeoutException e) {
44+
System.out.println(Thread.currentThread().getName() + " 1111");
45+
}
46+
} catch (InterruptedException e) {
47+
System.out.println(Thread.currentThread().getName() + " 2222");
48+
} catch (BrokenBarrierException e) {
49+
System.out.println(Thread.currentThread().getName() + " 3333");
50+
}
51+
System.out.println(Thread.currentThread().getName() + " 所有线程写入完毕,继续处理其他任务...");
52+
}
53+
}
54+
55+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.concurrent.BrokenBarrierException;
4+
import java.util.concurrent.CyclicBarrier;
5+
6+
public class CyclicBarrierTest4 {
7+
8+
public static void main(String[] args) {
9+
int N = 4;
10+
CyclicBarrier barrier = new CyclicBarrier(N);
11+
12+
for (int i = 0; i < N; i++) {
13+
new Writer(barrier).start();
14+
}
15+
16+
try {
17+
Thread.sleep(2500);
18+
} catch (InterruptedException e) {
19+
e.printStackTrace();
20+
}
21+
22+
System.out.println("CyclicBarrier重用");
23+
24+
for (int i = 0; i < N; i++) {
25+
new Writer(barrier).start();
26+
}
27+
}
28+
29+
static class Writer extends Thread {
30+
private CyclicBarrier cyclicBarrier;
31+
32+
public Writer(CyclicBarrier cyclicBarrier) {
33+
this.cyclicBarrier = cyclicBarrier;
34+
}
35+
36+
@Override
37+
public void run() {
38+
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
39+
try {
40+
Thread.sleep(500); //以睡眠来模拟写入数据操作
41+
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
42+
43+
cyclicBarrier.await();
44+
} catch (InterruptedException e) {
45+
e.printStackTrace();
46+
} catch (BrokenBarrierException e) {
47+
e.printStackTrace();
48+
}
49+
System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
50+
}
51+
}
52+
53+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.Semaphore;
5+
6+
public class SemaphoreTest {
7+
8+
final static Object lock = new Object();
9+
10+
final static int NUM = 10;
11+
12+
public static void main(String[] args) {
13+
int N = 10; //工人数
14+
Semaphore semaphore = new Semaphore(NUM); //机器数目
15+
for (int i = 0; i < N; i++)
16+
new Worker(i, semaphore).start();
17+
}
18+
19+
static class Worker extends Thread {
20+
private int num;
21+
private Semaphore semaphore;
22+
23+
public Worker(int num, Semaphore semaphore) {
24+
this.num = num;
25+
this.semaphore = semaphore;
26+
}
27+
28+
@Override
29+
public void run() {
30+
try {
31+
semaphore.acquire(5);
32+
System.out.println("工人" + this.num + "占用2个机器在生产...");
33+
Thread.sleep(new Random().nextInt(10));
34+
System.out.println("工人" + this.num + "释放出机器");
35+
semaphore.release(2);
36+
Thread.sleep(new Random().nextInt(1000));
37+
System.out.println("工人" + this.num + "释放出机器");
38+
semaphore.release(3);
39+
} catch (InterruptedException e) {
40+
e.printStackTrace();
41+
}
42+
}
43+
}
44+
45+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package edu.maskleo.module1;
2+
3+
import java.util.Random;
4+
5+
public class ThreadJoinTest {
6+
7+
public static void main(String[] args) {
8+
Thread t1 = new Thread(new InnerThread(),"t1");
9+
Thread t2 = new Thread(new InnerThread(),"t2");
10+
Thread t3 = new Thread(new InnerThread(),"t3");
11+
Thread t4 = new Thread(new InnerThread(),"t4");
12+
t1.start();
13+
t2.start();
14+
t3.start();
15+
t4.start();
16+
join(t1, t2, t3, t4);
17+
System.out.println("all finish!");
18+
}
19+
20+
private static void join(Thread t1, Thread t2, Thread t3, Thread t4) {
21+
try {
22+
t1.join();
23+
t2.join();
24+
t3.join();
25+
t4.join();
26+
}catch (Exception e){
27+
}
28+
}
29+
30+
static class InnerThread implements Runnable{
31+
@Override
32+
public void run() {
33+
System.out.println(Thread.currentThread().getName() + "begin to do something !");
34+
try {
35+
Thread.sleep(new Random().nextInt(1000));
36+
}catch (Exception e){
37+
;
38+
}
39+
System.out.println(Thread.currentThread().getName() + "finish !");
40+
}
41+
}
42+
43+
44+
}

0 commit comments

Comments
 (0)