-
Notifications
You must be signed in to change notification settings - Fork 1
/
MultiThreadAndLock.txt
1073 lines (964 loc) · 49.8 KB
/
MultiThreadAndLock.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--------------------------------------------------Multi-thread-----------------------------------------------------------
*************************************************************************************************************************
1.通过继承Thread来实现多线程,并重写run方法:
public class MyThread extends Thread {
private int i = 0;
@Override
public void run() {
for (; i < 100; i++) {
System.out.println(getName() + " " + i);
}
}
public static void main(String[] args) {
for(int i = 0; i < 100; i++){
System.out.println(Thread.currentThread().getName() + " " + i);
if(i == 20){
new MyThread().start();
new MyThread().start();
}
}
}
}
打印的结果,有三条线程同时执行,main线程和thread-0和thread-1,均会打印出从0-99的值。
每次启动新的线程在heap中创建了一个新的Mythread对象,每个Mythread对象都有私有的i属性,所以不会产生多线程安全问题。
源码:
@Override
public void run() { //Thread源码中的run方法是调用了target的run方法,而target是一个callable对象,如果没有传入callable对象这个方法什么都不做,所以利用overide在子类中对该方法进行定义。
if (target != null) {
target.run();
}
}
public synchronized void start() { //Change the thread from NEW to Runable.
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0) //该线程对象是否处于NEW state。
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this); //每个线程对象都有一个私有的ThreadGroup对象,将当前线程加入线程组,线程组被初始化时nUnstartedThreads被设置,现在可以--
boolean started = false;
try {
start0(); -->private native void start0(); //调用C或是C++来启动当前线程。
started = true;
} finally {
try {
if (!started) { //如果start失败,即C语言失败,修改ThreadGroup中对应的参数。C中对应的线程如果启动失败一般都是Core dump,不知道如何捕捉异常,需要研究JVM。
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
*************************************************************************************************************************
2.实现Runnable接口来实现多线程:
package ca.mcmaster.multithread;
/**
* @author SeanForFun E-mail:xiaob6@mcmaster.ca
* @date Apr 7, 2018 6:43:41 PM
* @version 1.0
*/
public class MyRunnable implements Runnable {
private int i = 0;
public void print(){
System.out.println(Thread.currentThread().getName() + " " + i);
}
@Override
public void run() { //重写run()方法,将线程的业务写在run方法中
for(; i < 100; i++){
print();
}
}
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
for(int i = 0; i < 100; i++){
System.out.println(Thread.currentThread() + " " + i);
if(i == 20){
new Thread(myRunnable, "thread-0").start(); //将我们的callable实现类传入Thread中,在run()方法中会调用Runable的方法
new Thread(myRunnable, "thread-1").start();
new Thread(myRunnable, "thread-2").start();
new Thread(myRunnable, "thread-3").start();
}
}
}
}
打印结果:
thread-1 90
thread-3 89
Thread[main,5,main] 57
thread-3 95
1.首先发现i作为runnable对象是共享的,会造成线程安全问题。但是因为++操作是原子性的,所以不会出现读写问题。
2.不是按顺序打印的,说明了对打印资源的获取出现了问题,此时打印资源被认为是一种临界资源(critical resource)。
解决方案:
synchronized public void print(){
System.out.println(Thread.currentThread().getName() + " " + i);
}
这是一种较为慢的加锁方式,synchronized的具体细节将会在下文讨论。
总结:
该方法的实质和继承Thread方法没有区别,但是是实现接口而不是继承类,设计上更加清洁。
*************************************************************************************************************************
3.实现Callable接口实现多线程:
package ca.mcmaster.multithread;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* @author SeanForFun E-mail:xiaob6@mcmaster.ca
* @date Apr 7, 2018 7:29:10 PM
* @version 1.0
*/
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int i = 0;
for (i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
return i;
}
public static void main(String[] args) {
MyCallable myCallable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(myCallable); //一个FutureTask只能接受一个Callable对象(已经验证),不然会造成返回值的同步问题(个人猜测)。
for(int i = 0; i < 100; i ++){
System.out.println(Thread.currentThread().getName() + " " + i);
if(i == 20){
new Thread(futureTask, "callable-0").start();
}
}
try {
System.out.println("Value returned from thread: " + futureTask.get()); //通过get方法获取线程的返回值。
} catch (Exception e) {
e.printStackTrace();
}
}
}
源码:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; //一个FutureTask只能接受一个Callable对象
this.state = NEW; // ensure visibility of callable
}
public void run() { //Thread对象调用run()方法
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran; //调用call()方法。
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); //将局部变量result更新到线程的属性outcome。
} //private Object outcome; non-volatile, protected by state reads/writes
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
总结:
1.该方法也是通过实现接口实现多线程。相对清洁。
2.实际上是利用了FutureTask类的对象。每个FutureTask对象会接收一个Callable的实现对象,不能接收多个!
3.实际上是和Runnable方法的实现方式一致,因为FutureTask是Runnable接口的子类。
--------------------------------------------------Lifecycle--------------------------------------------------------------
Five States in life cycle:
新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)
1. 新建状态,当程序使用new关键字创建了一个线程之后,该线程就处于新建状态,此时仅由JVM为其分配内存,并初始化其成员变量的值
Thread t = new Thread();
此时在堆中有一个Thread的实现对象,所有空间和资源已经准备就绪。
2. 就绪状态,当线程对象调用了start()方法之后,该线程处于就绪状态。Java虚拟机会为其创建方法调用栈和程序计数器,等待调度运行
程序计数器program counter:
微机原理中学习的知识,用于存储下一条汇编的地址。
方法栈:
每个函数都会在内存中开辟一块空间,并且初始化函数栈中的参数。
3. 运行状态,如果处于就绪状态的线程获得了CPU,开始执行run()方法的线程执行体,则该线程处于运行状态
执行线程所需要的业务,占用CPU资源。
4. 阻塞状态,当处于运行状态的线程失去所占用资源之后,便进入阻塞状态
① 线程调用sleep()方法主动放弃所占用的处理器资源
② 线程调用了一个阻塞式IO方法,在该方法返回之前,该线程被阻塞
③ 线程试图获得一个同步监视器(synchronized),但该同步监视器正被其他线程所持有。这种方法以最大的代价维护了线程安全。
④ 线程在等待某个通知(notify) //public final native void notify();
⑤ 程序调用了线程的suspend()方法将该线程挂起。但这个方法容易导致死锁,所以应该尽量避免使用该方法
5. 死亡状态:
① run()或call()方法执行完成,线程正常结束。
② 线程抛出一个未捕获的Exception或Error。
③ 直接调用该线程stop()方法来结束该线程——该方法容易导致死锁,通常不推荐使用。
总结:线程的生命周期看上去抽象切不好理解,却是代码实现的重要依据,在多线程编程时需要深刻理解每个线程在某一时间的生命周期状态从而实现线程安全和业务功能。
--------------------------------------------------ThreadControl--------------------------------------------------------------
1.join:Thread提供了让一个线程等待另一个线程完成的方法join()方法。当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完为止。join()方法通常由使用线程的程序调用,以将大问题划分成许多小问题,每个小问题分配一个线程。当所有的小问题都得到处理后,再调用主线程来进一步操作。
public static void main(String[] args) throws Exception {
// 启动子线程
new JoinThread("新线程").start();
for (int i = 0; i < 100; i++) {
if (i == 20) {
JoinThread jt = new JoinThread("被Join的线程");
jt.start();
// main线程调用了jt线程的join()方法,main线程
// 必须等jt执行结束才会向下执行
jt.join();
}
System.out.println(Thread.currentThread().getName() + "" + i);
}
}
->在A线程中调用B线程的join方法,A线程会等待B线程完成后再继续执行;
ca.mcmaster.multithread.threadcontroll.JoinTestThread
2.setDeamon:设置为守护线程:
public class DaemonThread extends Thread {
// 定义后台线程的线程执行体与普通线程没有任何区别
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println(getName() + "" + i);
}
}
public static void main(String[] args) {
DaemonThread t = new DaemonThread();
// 将此线程设置成后台线程
t.setDaemon(true);
// 启动后台线程
t.start();
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "" + i);
}
// -----程序执行到此处,前台线程(main线程)结束------
// 后台线程也应该随之结束
}
}
3.sleep:让该线程进入睡眠,同时也交出了cpu的使用权:
public class SleepTest {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("当前时间: " + new Date());
// 调用sleep方法让当前线程暂停1s。
Thread.sleep(1000);
}
}
}
4.yield:它也是Threard类提供的一个静态方法,它也可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程转入就绪状态。yield()只是让当前线程暂停一下,让系统的线程调度器重新调度一次,完全可能的情况是:当某个线程调用了yield()方法暂停之后,线程调度器又将其调度出来重新执行。
总结就是暂时交出CPU的占用,进入Runnale态。
public class YieldTest extends Thread {
public YieldTest(String name) {
super(name);
}
// 定义run方法作为线程执行体
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(getName() + "" + i);
// 当i等于20时,使用yield方法让当前线程让步
if (i == 20) {
Thread.yield();
}
}
}
public static void main(String[] args) throws Exception {
// 启动两条并发线程
YieldTest yt1 = new YieldTest("高级");
// 将ty1线程设置成最高优先级
// yt1.setPriority(Thread.MAX_PRIORITY);
yt1.start();
YieldTest yt2 = new YieldTest("低级");
// 将yt2线程设置成最低优先级
// yt2.setPriority(Thread.MIN_PRIORITY);
yt2.start();
}
}
--------------------------------------------------Lock-----------------------------------------------------------------------
公平锁:指多个线程按照申请锁的顺序来获取锁。
非公平锁:指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。
可重入锁 ReentrantLock:可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。对于Java ReentrantLock而言, 其名字是Re entrant Lock即是重新进入锁。对于synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
其中,synchronized和ReentrantLock均是可重入锁。
理解:可重入锁指的是
独享锁 Exclusive lock:指该锁一次只能被一个线程所持有。
共享锁 shared lock:指该锁可被多个线程所持有。
乐观锁/悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。
悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。比如Java里面的同步原语synchronized关键字的实现就是悲观锁。
乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS(Compare and Swap 比较并交换)实现的。
分段锁 SegmentLock:
强烈建议阅读源码!很有趣!
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。当需要put元素的时候,并不是对整个HashMap进行加锁,而是先通过hashcode来知道他要放在那一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。但是,在统计size的时候,可就是获取HashMap全局信息的时候,就需要获取所有的分段锁才能统计。
自旋锁 SpinLock:
在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU
偏向锁/轻量级锁/重量级锁
这三种锁是指锁的状态,并且是针对synchronized。在Java 5通过引入锁升级的机制来实现高效synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
---------------------------------------------------AQS-----------------------------------------------------------------------
1.AbstractQueuedSynchronized:抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架
java.util.concurrent.locks.AbstractQueuedSynchronizer
* +------+ prev +-----+ +-----+
resource * head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
volatile resource; //volatile是一个给JVM的提示i,告知JVM该资源非常抢手,尽快更新(写回内存)
所有的线程被排在一个FIFO队列中,按序获取资源。
---------------------------------------------------CAS-----------------------------------------------------------------------
CAS:Compare and swap
这是一种乐观锁技术,甚至可以通过无锁实现。
多个线程获取临界资源时,获取资源失败时不会进入Blocked state,同时可以再次尝试获取该锁。
实现方式:CAS操作中包含三个操作数——需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,否则处理器不做任何操作。
源码分析:java.util.concurrent.atomic.AtomicInteger
临界资源:private volatile int value;
public final int get() {
return value; //因为value被volatile修饰了,所以保证了可见性。
}
public final int getAndIncrement() {
for (;;) { //通过while循环构成了一个类似自旋锁的结构,不断尝试获得当前资源的访问权。
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) //通过变量判断当前值是不是已经被修改,如果没有被修改的话则更新成功。
return current;
}
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) { //判断是否可以被更新,通过比较当前值是否和期待值一致,是的话即可以更新。
return unsafe.compareAndSwapInt(this, valueOffset, expect, update); //native
}
总结:CAS是一种实现线程安全的技术。由于减少了从Runnable到Block的过程,减少了用户态与内核态的切换,减少了系统级别的开销。
但是,重复尝试获取资源的方式也是一种自旋的体现,实际上也增加了一定的CPU开销。
---------------------------------------------------Synchronized--------------------------------------------------------------
1.首先可以确认,synchronized是可重入锁,在get中调用了set,而此时的get并没有解锁而set又需要获取锁,同事发现set执行了,说明在同一个代码块中,内部的代码会自动获取synchronized锁,我们称之为可重入锁。
public class RtLock implements Runnable {
private int i = 0;
private void set(){
for(int j = 0; j < 20; j++)
System.out.println(Thread.currentThread() + " " + i++);
}
public synchronized void get(){
System.out.println(Thread.currentThread() + " get");
set();
}
@Override
public synchronized void run() {
get();
}
public static void main(String[] args) {
RtLock rtLock = new RtLock();
for(int i = 0; i < 2; i++){
new Thread(rtLock, "thread-"+i).start();
}
}
}
结果:
Thread[thread-0,5,main] get
Thread[thread-0,5,main] 0
Thread[thread-0,5,main] 1
Thread[thread-1,5,main] get
Thread[thread-1,5,main] 2
Thread[thread-1,5,main] 3
2.对于内部的set方法是否也需要上锁的问题:
public class RtLock implements Runnable {
private int i = 0;
private void set(){
for(int j = 0; j < 10; j++)
System.out.println(Thread.currentThread() + " " + i++);
}
public synchronized void get(){
System.out.println(Thread.currentThread() + " get");
set();
}
@Override
public synchronized void run() {
get();
}
public static void main(String[] args) {
RtLock rtLock = new RtLock();
for(int i = 0; i < 2; i++){
new Thread(rtLock, "thread-"+i).start();
}
for(int j = 0; j < 10; j++)
rtLock.set();
}
}
结果:
Thread[thread-0,5,main] get
Thread[thread-0,5,main] 1
Thread[main,5,main] 0
Thread[thread-0,5,main] 2
Thread[main,5,main] 3
Thread[thread-0,5,main] 4
乱序,因为main方法也在抢占set的资源。
如果每个线程都跑一样的业务的话,内部方法是可以不需要上锁的,如果不同的业务交织在一起,在需要对所有的临界资源(critical source)上锁。
3.通过synchronized修饰符给代码块加锁:
synchronized(critical resource){
// TODO
}
在定义接口方法时不能使用synchronized关键字。
构造方法不能使用synchronized关键字,但可以使用synchronized代码块来进行同步。
***总结:事实上synchronized方法是在告诉JVM,不要再将资源放在缓存中了!每次直接更新到内存中。
这是一种重量级的锁,每次都会导致获取不到资源的线程挂起,而挂起和恢复的过程都是用户态和内核态的切换,十分占用系统资源。
---------------------------------------------------ReentrantLock-------------------------------------------------------------
ReentrantLock可以被设定为公平锁和非公平锁:
源码:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public class ReentrantLockTest implements Runnable {
private ReentrantLock reentrantLock = new ReentrantLock(false);
private int value = 0;
@Override
public void run() {
reentrantLock.lock();
for(int i = 0; i < 3; i++){
updateValue();
}
reentrantLock.unlock();
}
private void updateValue() {
reentrantLock.lock();
System.out.println(Thread.currentThread() + " " + value++);
reentrantLock.unlock();
}
public static void main(String[] args) {
ReentrantLockTest rtLock = new ReentrantLockTest();
for(int i = 0; i < 10; i ++){
new Thread(rtLock).start();
}
}
}
结果:
Thread[Thread-0,5,main] 0
Thread[Thread-0,5,main] 1
Thread[Thread-0,5,main] 2
Thread[Thread-1,5,main] 3
Thread[Thread-1,5,main] 4
Thread[Thread-1,5,main] 5
Thread[Thread-2,5,main] 6
Thread[Thread-2,5,main] 7
结论:
与synchronized类似,ReentrantLock是一种可重入锁,我们发现内部的updateValue方法被运行了而外部的锁和内部的锁是同一个锁,且外部的锁没有释放。
我认为!ReentrantLock是对synchronized的一种轻量化和增强,我们无法再重写类似run方法时使用synchronized,但是我们可以使用加锁的方式实现!!!!!
---------------------------------------------------SpinLock------------------------------------------------------------------
自旋锁是一种锁设计模式,我个人认为这实际是一种无锁设计来保证线程安全。
线程级别的自旋锁:
class SpinLock {
private AtomicReference<Thread> sign = new AtomicReference<>(); //初始化的value值是null
public void lock(){
while(!sign.compareAndSet(null, Thread.currentThread())){ //如果当前的value值是空说明当前没有被锁,则可以退出当前循环进入业务
}
}
public void unlock(){
sign.compareAndSet(Thread.currentThread(), null); //将value重新置为空,允许别的线程访问。
}
}
*************************************************如何设计自己的自旋锁********************************************************
class SpinLock {
private AtomicReference<V> sign = new AtomicReference<>(); //创建临界资源锁对象,通过AtomicReference,只要是利用其中的CAS方法
public void lock(){
while(!sign.compareAndSet(expect, update)){ //锁住临界资源,即将
}
}
public void unlock(){
sign.compareAndSet(Thread.currentThread(), null); //将value重新置为空,允许别的线程访问。
}
}
*************************************************TicketLock*****************************************************************
public class TicketLock { //每次都要查询一个serviceNum 服务号,影响性能(必须要到主内存读取,并阻止其他cpu修改)。
private AtomicInteger serviceNum = new AtomicInteger();
private AtomicInteger ticketNum = new AtomicInteger();
private static final ThreadLocal<Integer> LOCAL = new ThreadLocal<Integer>(); //利用static属性,TicketLock对象均共享LOCAL对象,当myticket和分配到的ticket数相同时可以运行
public void lock() {
int myticket = ticketNum.getAndIncrement();
LOCAL.set(myticket);
while (myticket != serviceNum.get()) {
}
}
public void unlock() {
int myticket = LOCAL.get();
serviceNum.compareAndSet(myticket, myticket + 1);
}
}
*************************************************MCSLock*******************************************************************
public class MCSLock {
public static class MCSNode {
volatile MCSNode next;
volatile boolean isLocked = true;
}
private static final ThreadLocal<MCSNode> NODE = new ThreadLocal<MCSNode>();
@SuppressWarnings("unused")
private volatile MCSNode queue;
private static final AtomicReferenceFieldUpdater<MCSLock, MCSNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(MCSLock.class,
MCSNode.class, "queue");
public void lock() {
MCSNode currentNode = new MCSNode();
NODE.set(currentNode);
MCSNode preNode = UPDATER.getAndSet(this, currentNode);
if (preNode != null) {
preNode.next = currentNode;
while (currentNode.isLocked) {
}
}
}
public void unlock() {
MCSNode currentNode = NODE.get();
if (currentNode.next == null) {
if (UPDATER.compareAndSet(this, currentNode, null)) {
} else {
while (currentNode.next == null) {
}
}
} else {
currentNode.next.isLocked = false;
currentNode.next = null;
}
}
}
*************************************************CLHLock***********************************************************************
public class CLHLock {
public static class CLHNode {
private volatile boolean isLocked = true;
}
@SuppressWarnings("unused")
private volatile CLHNode tail;
private static final ThreadLocal<CLHNode> LOCAL = new ThreadLocal<CLHNode>();
private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(CLHLock.class,
CLHNode.class, "tail");
public void lock() {
CLHNode node = new CLHNode();
LOCAL.set(node);
CLHNode preNode = UPDATER.getAndSet(this, node);
if (preNode != null) {
while (preNode.isLocked) {
}
preNode = null;
LOCAL.set(node);
}
}
public void unlock() {
CLHNode node = LOCAL.get();
if (!UPDATER.compareAndSet(this, node, null)) {
node.isLocked = false;
}
node = null;
}
}
---------------------------------------------------Pass Values to thread-----------------------------------------------------
*************************************************Constructor*****************************************************************
实现:
public class PassParameterByConstructor implements Runnable {
private volatile Integer i = null;
private ReentrantLock rtLock = new ReentrantLock();
public PassParameterByConstructor(Integer i){
this.i = i;
}
@Override
public void run() {
rtLock.lock();
for (int j = 0; j < 2; j++) {
System.out.println(Thread.currentThread().getName() + " " + i++);
}
rtLock.unlock();
}
public static void main(String[] args) {
PassParameterByConstructor p = new PassParameterByConstructor(10);
for(int j = 0; j < 10; j++){
new Thread(p).start();
}
}
}
*************************************************Use variables and methods***************************************************
实现:
public class PassParaByMethod implements Runnable{
private ReentrantLock reentrantLock = new ReentrantLock();
private Integer a = null;
public void run() {
reentrantLock.lock();
for (int j = 0; j < 2; j++) {
System.out.println(Thread.currentThread().getName() + " " + a++);
}
reentrantLock.unlock();
}
private void setA(Integer a){
this.a = a;
}
public static void main(String[] args) {
PassParaByMethod r = new PassParaByMethod();
r.setA(10);
for(int i = 0; i <10; i++){
new Thread(r).start();
}
}
}
*************************************************Use ThreadLocal******************************************************
将ThreadLocal放在这儿并不准确,因为threadLocal无法从线程外部接收参数。
threadLocal更多的是用于存储线程本地的参数,这些参数和生命周期和线程的生命周期一样。
public class PassParaByThreadLocal implements Runnable {
private ReentrantLock reentrantLock = new ReentrantLock();
private static ThreadLocal<Integer> integerLocal = new ThreadLocal<>();
@Override
public void run() {
reentrantLock.lock();
integerLocal.set(10);
Integer a = integerLocal.get();
for(int i = 0; i < 2; i++){
System.out.println(Thread.currentThread().getName() + " " + a++);
}
reentrantLock.unlock();
}
public static void main(String[] args) {
PassParaByThreadLocal p = new PassParaByThreadLocal();
for(int i = 0; i < 10; i++){
new Thread(p).start();
}
}
}
----------------------------------------------------ThreadLocal--------------------------------------------------------------
解析文章:https://mp.weixin.qq.com/s?__biz=MzA3MDExNzcyNA==&mid=2650392118&idx=1&sn=a2144a19bdeba48001f4f76f423e25d9&scene=0
public class ThreadLocalTest implements Runnable{
private static ThreadLocal<Long> longLocal = new ThreadLocal<Long>();
private static ThreadLocal<String> stringLocal = new ThreadLocal<String>();
@Override
public void run() {
stringLocal.set(Thread.currentThread().getName());
longLocal.set(Thread.currentThread().getId());
print();
}
private synchronized void print() {
System.out.println("------------------------------------------------------------------------------------------");
System.out.println(Thread.currentThread().getName() + " " + stringLocal.get() + " " + longLocal.get());
System.out.println("------------------------------------------------------------------------------------------");
}
public static void main(String[] args) {
ThreadLocalTest threadLocalTest = new ThreadLocalTest();
for(int i = 0; i < 4; i++){
new Thread(threadLocalTest).start();
}
}
}
源码解析:
ThreadLocal的实现是一种典型的空间换去时间的做法,在这种情况下确定了ThreadLocal的生命周期。
据说在JDK1.3以前是一种维护了一个全局的ThreadLocal,这种情况下是时间换去空间(引索花费时间)。
个人的理解是,线程这种级别的操作,大多是通过线程池维护的,总数量是一个定值(不要去纠结struct这种框架),所以空间换取时间的tradeoff是完全值得的。
in Thread.class:
ThreadLocal.ThreadLocalMap threadLocals = null; //Thread类中维护了一个属性threadLocals,这个变量是线程私有的。是一个通过数组实现的hash表
***threadLocals分析:
事实上是以Entry数组,类似ConcurrentHashMap的实现方式。
每一个元素均是一个键值对。
所以问题来了,什么是我们需要的键?最简单的回答是,每个要存储对象的地址可以作为键,存储到每个表的threadLocals对象中。现在将ThreadLocal对象的地址作为键。
虽然多个线程均使用了同一个Runnable对象(所以共享一个longLocal对象的地址),但是是将这个地址作为键存储到每个线程自己对应的表中!!!线程安全!
创建:
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue); //如果发现当前类还没有一个map容器用于存储数据,创建一个新的ThreadLocalMap,专门给当前线程使用
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY]; //创建一个Entry数组
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); //为当前thread所私有的
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
放入数据:
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); //先找到当前线程所对应的ThreadLocalMap对象threadLocals
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
根据ThreadLocal对象的地址取出数据:
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
----------------------------------------------------Wait/Notify--------------------------------------------------------------
wait, notify & notifyAll 三个方法均是Object类的方法。
wait()方法
定义在Object上的方法, 是java语言级的方法, 需要在同步块或者同步方法中进行调用, 会释放锁, 并进入锁对象的等待队列, 需要等到其他线程调用notify()方法释放锁后(实际上该线程同步块运行结束后才会释放锁), 重新竞争锁.
调用wait()方法后,线程由RUNNING变为WAITING,并将当前线程放置于对象等待队列
notify(),notifyAll()方法
调用后,等待线程依然不会从wait()方法返回,而是等调用notify(),notifyAll()的线程释放该锁之后,等待线程才有机会从wait()返回。
notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而 notifyAll()方法则是把所有等待线程从等待队列中移到同步队列中,被移动的线程的状态由WAITING变成BLOCKED。
个人总结:
对于一个临界资源,我们维护了三条列
========================== 使用队列->一般在正常的情况下,临界资源只会被一个线程使用。
========================== 阻塞队列->线程的状态是BLOCKED,是一种系统级别的线程状态。等待运行,说明此时的资源已经被释放。
========================== 等待队列->临界资源没有被释放,线程主动进入wait,等待被notify后进入阻塞队列被系统调用获取临界资源。***已测试:此时现成的状态仍然是Running的。
* The current thread must own this object's monitor. The thread
* releases ownership of this monitor and waits until another thread
* notifies threads waiting on this object's monitor to wake up
* either through a call to the {@code notify} method or the
* {@code notifyAll} method. The thread then waits until it can
* re-obtain ownership of the monitor and resumes execution.
* As in the one argument version, interrupts and spurious wakeups are
* possible, and this method should always be used in a loop:
* <pre>
* synchronized (obj) {
* while (<condition does not hold>) //因为是多线程抢占资源,所以在获得临界资源到据需运行的过程该值可能已经被修改了,利用while在进行一次判断
* obj.wait();
* ... // Perform action appropriate to condition
* }
一个线程从wait到运行的途径:
1.从等待队列进入阻塞队列,等待被CPU分配资源
2.从阻塞态进入运行态,获取资源并运行。
实例:
public class WaitNotifyTest {
private static Object cr = new Object();
private static Boolean block = true;
static class Wait implements Runnable{
public void run() {
synchronized (cr) { //获取了资源,但是可能当前资源还需要别的线程进行处理
while(block){ //判断当前的资源是不是已经和预期的一致。是的话可以跳出循环。
System.out.println(Thread.currentThread().getName() + " " + " entering waiting");
try {
cr.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " :do service"); //数据已经被处理,继续执行业务。
}
}
}
static class Notify implements Runnable{
public void run() {
synchronized (cr) {
cr.notifyAll(); //通知正在等待的队列资源已经准备完毕
System.out.println(Thread.currentThread().getName() + " " + "notify"); //此处仍没有释放临界资源
block = false; //修改条件,允许退出循环
}
}
}
public static void main(String[] args) throws InterruptedException {
Wait wait = new Wait();
new Thread(wait).start();
Thread.sleep(10);
Notify notify = new Notify();
new Thread(notify).start();
}
}
***现象:debug模式下synchronized代码块退出的时候会再次运行到synchronized块,应该是一个释放锁的过程。
实例:
public class WaitNotifyTest1 {
private static Integer mutex = 0;
static class Wait implements Runnable{
public void run() {
synchronized (mutex) {
while(mutex != 1){ //直到mutex变成1我们才会继续处理业务
System.out.println(Thread.currentThread().getName() + " " + " entering waiting");
try {
mutex.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " mutex= " + ++mutex);
}
}
}
static class Notify implements Runnable{
public void run() {
synchronized (mutex) {
mutex.notify();
System.out.println(Thread.currentThread().getName() + " " + "notify");
mutex = 1;
}
}
}
public static void main(String[] args) throws InterruptedException {
Wait wait = new Wait();
new Thread(wait).start();
Thread.sleep(10);
Notify notify = new Notify();
new Thread(notify).start();
}
}
notify & notifyAll:
相同:
都通知了等待该资源的线程,可以尝试有JVM分配资源和时间片。
不同:
notify只通知一个线程,notifyAll通知了所有的线程。
最重要的意义:
notify会造成死锁!!!思考多消费者读取某个临界资源,多生产者修改临界资源,某个线程可能永远都不会得到期望值从而死锁!
----------------------------------------------------Affinity-----------------------------------------------------------------
1.绑定亲和性可以让我们对线程的运行有更多的控制,保证了run-to-complete模型。
2.减少了频繁切换线程所造成的系统开销。
3.dpdk的运行模式。
----------------------------------------------------ThreadPool---------------------------------------------------------------
参考资料:http://www.cnblogs.com/dolphin0520/p/3932921.html
1.为什么需要线程池:
源码:java.lang.Thread.init(ThreadGroup, Runnable, String, long, AccessControlContext, boolean)
创建新的线程是需要很大系统开销的,首先需要开辟线程专用的栈空间,安全检查,进行底层调用(调用linux系统),所以我们需要一种在启动时就创建好一些线程,并且每个线程不会消亡,而是可以重复使用的方法。
2.使用java.util.concurrent.ThreadPoolExecutor类创建线程池:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
corePoolSize:核心池的大小。大概在线程池中一直存在等着被使用的线程数量。
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位
workQueue:一个阻塞队列,用来存储等待执行的任务
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
源码分析:
Constructor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue; //private final BlockingQueue<Runnable> workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory; //private volatile ThreadFactory threadFactory;
this.handler = handler;
}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //创建了一个原子变量确定当前线程池的状态,用于保证线程池状态对多个线程的可见性。
private static final int RUNNING = -1 << COUNT_BITS; //当创建线程池后,初始时,线程池处于RUNNING状态;
private static final int SHUTDOWN = 0 << COUNT_BITS; //如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
private static final int STOP = 1 << COUNT_BITS; //如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
private static final int TIDYING = 2 << COUNT_BITS; //线程池准备关闭之前的状态,其中已经没有线程正在运行。
private static final int TERMINATED = 3 << COUNT_BITS; //当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
实际运行的方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Worker是一个ThreadPoolExecutor的内部类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
实际上是内部维护的线程对象,也是线程业务的实际运行对象。
实例
****************************************************************直接使用ThreadPoolExecutor(不推荐)*****************************************************************
public class ThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));