-
Notifications
You must be signed in to change notification settings - Fork 0
Description
概述
Java并发包里面所有的阻塞锁和同步的抽象模板类AbstractQueuedSynchronizer,主要依赖FIFO等待队列来实现,所有底层的重入锁和同步类都是从这个类派生的,所以理解这个类最关键,不过这篇文章主要结合重入锁进行开始分析
设计要点
AQS内部维护了一个阻塞队列,名字叫CLH(取自三个人的第一个字母),先来看下这个类的几个重要类
- Node
队列的节点,每个节点内部持有一个线程属性,和对应的前驱和后续节点,还有个节点的标志是否是共享模式还是排他模式
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); //表示是一个共享节点
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; //表示是一个排他节点
volatile int waitStatus; //代表的是后续节点的等待状态,不是当前节点
volatile Node prev; //链表的前驱节点
volatile Node next;//链表的后续节点
volatile Thread thread; //代表当前节点对应的线程
Node nextWaiter; 等待节点- AbstractQueuedSynchronizer
这个是所有并发包的基础类,最重要也就是下面几个属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;//等待队列的头节点
private transient volatile Node tail;//等待队列的尾结点
private volatile int state;//同步状态
private transient Thread exclusiveOwnerThread;//这个是父类的属性,代表是当前拥有排他的线程
}由于这个类里面其实是个模板类,所以必须要结合子类,才能读懂里面的代码,先从简单的子类重入锁类开始吧
- ReetrantLock
这个重入锁有2种模式,公平模式和非公平模式,有FairSync和NonfairSync实现,这2个都是继承Sync类,而Sync又继承AbstractQueuedSynchronizer,而Sync类本身又是一个模板类,本身即实现了AQS的tryRelease模板方法,同时提供了lock模板方法。
分析下FairSync的lock方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);//这里调用了父类的acquire方法
}看下父类的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}//这里调用了tryAcquire方法,这个方法其实是个模板方法,所以还是回到子类看这个实现,继续看子类
FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取同步状态,如果是0,那么说明
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}可以看到其逻辑如下
- 获取当前线程
- 获取当前同步状态
- 如果当前同步状态为0,说明还没有线程获取锁
- 继续判断是否有等待的节点,如果没有继续等待的节点,并且设置同步状态为1成功,同时设置当前线程为排他线程
- 如果同步状态不为0,说明有线程持有锁了,判断是否是当前线程获取了锁,如果是,对同步状态加1操作
- 否则返回false
看到这里如果tryAcquire返回false,接着调用acquireQueued(addWaiter(Node.EXCLUSIVE)),这个方法有点难懂,需要先从addWaiter开始。
private Node addWaiter(Node node){
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}这个就是典型的添加节点到队列的逻辑,使用CAS加死循环,保证多线程的原子性等,入队的方法看enq(node)这部分代码
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//创建head节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
} 这里首先判断tail是否为空,如果为空,那么说明还没有初始化链表,也就是先cas创建head节点,然后设置tail等于head,否则如果已经初始化过,那么先设置node的前驱节点为tail,并cas设置tail为自己,如果设置成功,那么设置tail的next为自,不成功说明有别的线程设置成功,那么继续循环,直到成功为止。
addWaiter最后返回node作为acquireQueued的方法入口参数,并传入一个参数(仍是1),看看它里面做了什么操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}这里也是一个死循环,除非进入if(p == head &&tryAcquire(arg))这个判定条件,而p为node.predcessor()得到,这个方法返回node节点的前一个节点,也就是说只有当前一个节点是head的时候,进一步尝试通过tryAcquire(arg)来征用才有机会成功。tryAcquire(arg)这个方法我们前面介绍过,成立的条件为:锁的状态为0,且通过CAS尝试设置状态成功或线程的持有者本身是当前线程才会返回true,我们现在来详细拆分这部分代码。
- 如果这个条件成功后,发生的几个动作包含:
- 首先调用setHead(Node)的操作,这个操作内部会将传入的node节点作为AQS的head所指向的节点。线程属性设置为空(因为现在已经获取到锁,不再需要记录下这个节点所对应的线程了),再将这个节点的perv引用赋值为null。
- 进一步将的前一个节点的next引用赋值为null。
在进行了这样的修改后,队列的结构就变成了以下这种情况了,通过这样的方式,就可以让执行完的节点释放掉内存区域,而不是无限制增长队列,也就真正形成FIFO了:
- 如果这个判定条件失败
会首先判定:“shouldParkAfterFailedAcquire(p , node)”,这个方法内部会判定前一个节点的状态是否为:“Node.SIGNAL”,若是则返回true,若不是都会返回false,不过会再做一些操作:判定节点的状态是否大于0,若大于0则认为被“CANCELLED”掉了(我们没有说明几个状态的值,不过大于0的只可能被CANCELLED的状态),因此会从前一个节点开始逐步循环找到一个没有被“CANCELLED”节点,然后与这个节点的next、prev的引用相互指向;如果前一个节点的状态不是大于0的,则通过CAS尝试将状态修改为“Node.SIGNAL”,自然的如果下一轮循环的时候会返回值应该会返回true。
如果这个方法返回了true,则会执行:“parkAndCheckInterrupt()”方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
相应的,可以自己看看FairSync实现类的lock方法,其实区别不大,有些细节上的区别可能会决定某些特定场景的需求,你也可以自己按照这样的思路去实现一个自定义的锁。
接下来简单看看unlock()解除锁的方式,如果获取到了锁不释放,那自然就成了死锁,所以必须要释放,来看看它内部是如何释放的。同样从排它锁(ReentrantLock)中的unlock()方法开始,请先看下面的代码:
public void unlock(){
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。
成功释放锁owner线程后,会继续把head节点的下一个节点的线程唤起,如果下一个节点为空,或者是下一个节点的waitStatus大于0, 则需要从tail节点开始往前查找节点的waitstatus小于等于0,找到后调用LockSupport.unpark()进行唤起。
至此整个FairSync的代码分析完了,非公平锁的唯一区别就是先直接设置同步状态,如果设置成功,那么些如当前线程为排他线程。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}这里compareAndSetState(0, 1) 就是直接设置为1,如果设置不成功,在调用acquire(1),其实就是进入排队了,另外tryAcquire则是调用父类的nonfairTryAcquire,不过这里为啥要放在父类,更合理的是把nonfairTryAcquire这个方法内部逻辑直接放在NonfairSync的tryAcquire的方法里面,更加符合OO原则,别的逻辑和FairSync一样了,就不在分析了。
补充
关于Lock及AQS的一些补充:
- Lock的操作不仅仅局限于lock()/unlock(),因为这样线程可能进入WAITING状态,这个时候如果没有unpark()就没法唤醒它,可能会一直“睡”下去,可以尝试用tryLock()、tryLock(long , TimeUnit)来做一些尝试加锁或超时来满足某些特定场景的需要。例如有些时候发现尝试加锁无法加上,先释放已经成功对其它对象添加的锁,过一小会再来尝试,这样在某些场合下可以避免“死锁”哦。
- lockInterruptibly() 它允许抛出InterruptException异常,也就是当外部发起了中断操作,程序内部有可能会抛出这种异常,但是并不是绝对会抛出异常的,大家仔细看看代码便清楚了。
- newCondition()操作,是返回一个Condition的对象,Condition只是一个接口,它要求实现await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll()方法,AbstractQueuedSynchronizer中有一个内部类叫做ConditionObject实现了这个接口,它也是一个类似于队列的实现,具体可以参考源码。大多数情况下可以直接使用,当然觉得自己比较牛逼的话也可以参考源码自己来实现。
- 在AQS的Node中有每个Node自己的状态(waitStatus),我们这里归纳一下,分别包含:
- SIGNAL 从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己,值为:-1
- CANCELLED 当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED的意思。
- CONDITION 线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来激活,值为-2。
- PROPAGATE 读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个doReleaseShared()动作,内部也是一个循环,当判定后续的节点状态为0时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。
状态0 初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态

