Skip to content

AQS 共享模式中 PROPAGATE 示例推演疑似遗漏新 head.waitStatus 判断 #2861

@Fisherder

Description

@Fisherder

问题位置

文档路径:docs/java/concurrent/aqs.md

相关位置:

  • setHeadAndPropagate() 源码及条件解释:
    因此在共享模式下,当线程线程被唤醒之后,获取到了资源,如果发现还存在剩余资源,就会尝试唤醒后边的线程去尝试获取资源。对应的 `setHeadAndPropagate()` 方法如下:
    ```JAVA
    // AQS
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 1、将当前线程节点移出等待队列。
    setHead(node);
    // 2、唤醒后续等待节点。
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }
    ```
    `setHeadAndPropagate()` 方法中,唤醒后续节点需要满足一定的条件,主要需要满足 2 个条件:
    - `propagate > 0``propagate` 代表获取资源之后剩余的资源数量,如果 `> 0` ,则可以唤醒后续线程去获取资源。
    - `h.waitStatus < 0` :这里的 `h` 节点是执行 `setHead()` 之前的 `head` 节点。判断 `head.waitStatus` 时使用 `< 0` ,主要为了确定 `head` 节点的状态为 `SIGNAL``PROPAGATE` 。如果 `head` 节点为 `SIGNAL` ,则可以唤醒后续节点;如果 `head` 节点状态为 `PROPAGATE` ,也可以唤醒后续节点(这是为了解决并发场景下出现的问题,后续会细讲)。
    代码中关于 **唤醒后续等待节点**`if` 判断稍微复杂一些,这里来讲一下为什么这样写:
    ```JAVA
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0)
    ```
    - `h == null || h.waitStatus < 0``h == null` 用于防止空指针异常。正常情况下 h 不会为 `null` ,因为执行到这里之前,当前节点已经加入到队列中了,队列不可能还没有初始化。
    `h.waitStatus < 0` 主要判断 `head` 节点的状态是否为 `SIGNAL` 或者 `PROPAGATE` ,直接使用 `< 0` 来判断比较方便。
    - `(h = head) == null || h.waitStatus < 0` :如果到这里说明之前判断的 `h.waitStatus < 0` ,说明存在并发。
    同时存在其他线程在唤醒后续节点,已经将 `head` 节点的值由 `SIGNAL` 修改为 `0` 了。因此,这里重新获取新的 `head` 节点,这次获取的 `head` 节点为通过 `setHead()` 设置的当前线程节点,之后再次判断 `waitStatus` 状态。
  • doReleaseShared()PROPAGATE 说明:
    private void doReleaseShared() {
    for (;;) {
    Node h = head;
    // 1、队列中至少需要一个等待的线程节点。
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    // 2、如果 head 节点的状态为 SIGNAL,则可以唤醒后继节点。
    if (ws == Node.SIGNAL) {
    // 2.1 清除 head 节点的 SIGNAL 状态,更新为 0。表示已经唤醒该节点的后继节点了。
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue;
    // 2.2 唤醒后继节点
    unparkSuccessor(h);
    }
    // 3、如果 head 节点的状态为 0,则更新为 PROPAGATE。这是为了解决并发场景下存在的问题,接下来会细讲。
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;
    }
    if (h == head)
    break;
    }
    }
    ```
    `doReleaseShared()` 方法中,会判断 `head` 节点的 `waitStatus` 状态来决定接下来的操作,有两种情况:
    - `head` 节点的状态为 `SIGNAL` :表明 `head` 节点存在后继节点需要唤醒,因此通过 `CAS` 操作将 `head` 节点的 `SIGNAL` 状态更新为 `0` 。通过清除 `SIGNAL` 状态来表示已经对 `head` 节点的后继节点进行唤醒操作了。
    - `head` 节点的状态为 `0` :表明存在并发情况,需要将 `0` 修改为 `PROPAGATE` 来保证在并发场景下可以正常唤醒线程。
    #### 为什么需要 `PROPAGATE` 状态?
    `doReleaseShared()` 释放资源时,第 3 步不太容易理解,即如果发现 `head` 节点的状态是 `0` ,就将 `head` 节点的状态由 `0` 更新为 `PROPAGATE`
    AQS 中,Node 节点的 `PROPAGATE` 就是为了处理并发场景下可能出现的无法唤醒线程节点的问题。`PROPAGATE` 只在 `doReleaseShared()` 方法中用到一次。
  • “为什么需要 PROPAGATE 状态?”示例推演:
    **接下来通过案例分析,为什么需要 `PROPAGATE` 状态?**
    在共享模式下,线程获取和释放资源的方法调用链如下:
    - 线程获取资源的方法调用链为: `acquireShared() -> tryAcquireShared() -> 线程阻塞等待唤醒 -> tryAcquireShared() -> setHeadAndPropagate() -> if (剩余资源数 > 0) || (head.waitStatus < 0) 则唤醒后续节点`
    - 线程释放资源的方法调用链为: `releaseShared() -> tryReleaseShared() -> doReleaseShared()`
    **如果在释放资源时,没有将 `head` 节点的状态由 `0` 改为 `PROPAGATE`**
    假设总共有 4 个线程尝试以共享模式获取资源,总共有 2 个资源。初始 `T3``T4` 线程获取到了资源,`T1``T2` 线程没有获取到,因此在队列中排队等候。
    - 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为(括号内为节点的 `waitStatus` 状态):
    `head(-1) -> T1(-1) -> T2(0)`
    - 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
    线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
    `head(0) -> T1(-1) -> T2(0)`
    - 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中无法唤醒 `head` 的后继节点, 之后线程 `T4` 退出。
    - 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。
    但是此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,并且 `head` 节点的状态为 `0` ,因此线程 `T1` 并不会在 `setHeadAndPropagate()` 方法中唤醒后续节点。此时等待队列内节点状态为:
    `head(-1,线程 T1 节点) -> T2(0)`
    此时,就导致线程 `T2` 节点在等待队列中,无法被唤醒。对应时刻表如下:
    | 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
    | ------ | -------------------------------------------------------------- | -------- | ---------------- | ------------------------------------------------------------- | --------------------------------- |
    | 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
    | 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
    | 时刻 3 | | 等待队列 | 已退出 | (执行)释放资源。但 `head` 节点状态为 `0` ,无法唤醒后继节点 | `head(0) -> T1(-1) -> T2(0)` |
    | 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1,线程 T1 节点) -> T2(0)` |
    **如果在线程释放资源时,将 `head` 节点的状态由 `0` 改为 `PROPAGATE` ,则可以解决上边出现的并发问题,如下:**
    - 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为:
    `head(-1) -> T1(-1) -> T2(0)`
    - 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
    线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
    `head(0) -> T1(-1) -> T2(0)`
    - 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中会将 `head` 节点的状态由 `0` 更新为 `PROPAGATE` , 之后线程 `T4` 退出。此时等待队列内节点状态为:
    `head(PROPAGATE) -> T1(-1) -> T2(0)`
    - 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。此时等待队列内节点状态为:
    `head(-1,线程 T1 节点) -> T2(0)`
    - 在时刻 5 时,虽然此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,但是 `head` 节点状态为 `PROPAGATE < 0` (这里的 `head` 节点是老的 `head` 节点,而不是刚成为 `head` 节点的线程 `T1` 节点)。
    因此线程 `T1` 会在 `setHeadAndPropagate()` 方法中唤醒后续 `T2` 节点,并将 `head` 节点的状态由 `SIGNAL` 更新为 `0`。此时等待队列内节点状态为:
    `head(0,线程 T1 节点) -> T2(0)`
    - 在时刻 6 时,线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点。此时等待队列内节点状态为:
    `head(0,线程 T2 节点)`
    有了 `PROPAGATE` 状态,就可以避免线程 `T2` 无法被唤醒的情况。对应时刻表如下:
    | 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
    | ------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ---------------- | ------------------------------------------------------------------- | ------------------------------------ |
    | 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
    | 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
    | 时刻 3 | 未继续向下执行 | 等待队列 | 已退出 | (执行)释放资源。此时会将 `head` 节点状态由 `0` 更新为 `PROPAGATE` | `head(PROPAGATE) -> T1(-1) -> T2(0)` |
    | 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1,线程 T1 节点) -> T2(0)` |
    | 时刻 5 | (执行)由于 `head` 节点状态为 `PROPAGATE < 0` ,因此会在 `setHeadAndPropagate()` 方法中唤醒后续节点,此时将新的 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T2` | 等待队列 | 已退出 | 已退出 | `head(0,线程 T1 节点) -> T2(0)` |
    | 时刻 6 | 已退出 | (执行)线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点 | 已退出 | 已退出 | `head(0,线程 T2 节点)` |

疑似问题

文档在解释 AQS 共享模式下 PROPAGATE 的作用时,给出了一个示例:共享资源数为 2,T3T4 已经获取资源,T1T2 在 AQS 队列中等待。

其中有一步推演大致为:

  • 初始队列:head(-1) -> T1(-1) -> T2(0)
  • T3 释放资源,将 head.waitStatusSIGNAL(-1) 改为 0,并唤醒 T1
  • T1 被唤醒后成功 tryAcquireShared(),但还没有执行 setHeadAndPropagate()
  • T4 此时释放资源,由于当前 head.waitStatus == 0,所以 doReleaseShared() 无法唤醒后继;
  • 随后 T1 执行 setHeadAndPropagate(),文档推演认为由于 propagate == 0head.waitStatus == 0,所以不会继续唤醒 T2,导致 T2 无法被唤醒。

这里我理解可能有一点遗漏:setHeadAndPropagate() 并不是只判断 propagate > 0 和旧 head.waitStatus < 0,它还会在 setHead(node) 之后重新读取新的 head 并再次判断 waitStatus < 0

源码依据

以 OpenJDK 8u 中的 AbstractQueuedSynchronizer#setHeadAndPropagate() 为例,关键判断如下:

if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
        doReleaseShared();
}

源码位置:https://github.com/openjdk/jdk8u/blob/master/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java#L708-L731

这里包含两次 head 判断:

  • 第一次 h.waitStatus < 0 判断的是 setHead(node) 之前保存的旧 head
  • 第二次 (h = head) == null || h.waitStatus < 0 判断的是 setHead(node) 之后重新读取到的新 head

具体推演

如果按照文档示例,在 T1 执行 setHeadAndPropagate() 前队列是:

head(0) -> T1(-1) -> T2(0)

那么 T1 执行 setHeadAndPropagate(T1, propagate) 时:

  1. 先保存旧 head,此时旧 head.waitStatus == 0
  2. 执行 setHead(T1)T1 成为新的 head
  3. 此时新的 head 就是原来的 T1 节点;
  4. 示例中原 T1 节点的 waitStatus-1
  5. 所以重新判断 (h = head).waitStatus < 0 时条件成立;
  6. 于是会进入 if
  7. 随后判断 Node s = node.next,也就是 T1.next
  8. 如果 T2 是共享节点,则会调用 doReleaseShared()
  9. doReleaseShared() 会继续尝试唤醒 T2

也就是说,在这个状态下:

head(0) -> T1(-1) -> T2(0)

T1 后续执行 setHeadAndPropagate() 时,似乎不会因为 propagate == 0 且旧 head.waitStatus == 0 就停止传播。因为方法还会检查更新后的新 head,也就是原来的 T1 节点;如果 T1.waitStatus == -1,那么新 head.waitStatus < 0 条件仍然成立。

因此,文档中“由于 propagate == 0head.waitStatus == 0,所以 T1 不会在 setHeadAndPropagate() 中唤醒后继节点,导致 T2 无法唤醒”的推演,疑似遗漏了 setHead(node) 之后对新 head.waitStatus 的再次判断。

以上是我阅读源码后的理解,也可能是我对该并发场景的理解存在偏差。如果有理解不准确的地方,也欢迎指正。

感谢维护者整理 JavaGuide。

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions