|
1 | | -努力编写中... |
| 1 | +Semaphore 信号量,可用于控制一定时间内,并发执行的线程数,基于AQS实现。可应用于网关限流、资源限制 (如 最大可发起连接数)。由于 release() 释放许可时,未对释放许可数做限制,所以可以通过该方法增加总的许可数量。 |
| 2 | + |
| 3 | +**获取许可** 支持公平和非公平模式,默认非公平模式。公平模式无论是否有许可,都会先判断是否有线程在排队,如果有线程排队,则进入排队,否则尝试获取许可;非公平模式无论许可是否充足,直接尝试获取许可。 |
| 4 | + |
| 5 | +不多废话,下面直接看源码。 |
| 6 | + |
| 7 | +#### 核心内部类 Sync |
| 8 | +```java |
| 9 | +abstract static class Sync extends AbstractQueuedSynchronizer { |
| 10 | + |
| 11 | + private static final long serialVersionUID = 1192457210091910933L; |
| 12 | + |
| 13 | + /* 赋值state为总许可数 */ |
| 14 | + Sync(int permits) { |
| 15 | + setState(permits); |
| 16 | + } |
| 17 | + |
| 18 | + /* 剩余许可数 */ |
| 19 | + final int getPermits() { |
| 20 | + return getState(); |
| 21 | + } |
| 22 | + |
| 23 | + /* 自旋 + CAS非公平获取 */ |
| 24 | + final int nonfairTryAcquireShared(int acquires) { |
| 25 | + for (;;) { |
| 26 | + // 剩余可用许可数 |
| 27 | + int available = getState(); |
| 28 | + // 本次获取许可后,剩余许可 |
| 29 | + int remaining = available - acquires; |
| 30 | + // 如果获取后,剩余许可大于0,则CAS更新剩余许可,否则获取失败失败 |
| 31 | + if (remaining < 0 || |
| 32 | + compareAndSetState(available, remaining)) |
| 33 | + return remaining; |
| 34 | + } |
| 35 | + } |
| 36 | + |
| 37 | + /** |
| 38 | + * 自旋 + CAS 释放许可 |
| 39 | + * 由于未对释放许可数做限制,所以可以通过release动态增加许可数量 |
| 40 | + */ |
| 41 | + protected final boolean tryReleaseShared(int releases) { |
| 42 | + for (;;) { |
| 43 | + // 当前剩余许可 |
| 44 | + int current = getState(); |
| 45 | + // 许可更新值 |
| 46 | + int next = current + releases; |
| 47 | + // 如果许可更新值为负数,说明许可数量溢出,抛出错误 |
| 48 | + if (next < current) // overflow |
| 49 | + throw new Error("Maximum permit count exceeded"); |
| 50 | + // CAS更新许可数量 |
| 51 | + if (compareAndSetState(current, next)) |
| 52 | + return true; |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + /* 自旋 + CAS 减少许可数量 */ |
| 57 | + final void reducePermits(int reductions) { |
| 58 | + for (;;) { |
| 59 | + // 当前剩余许可 |
| 60 | + int current = getState(); |
| 61 | + // 更新值 |
| 62 | + int next = current - reductions; |
| 63 | + // 较少许可数错误,抛出异常 |
| 64 | + if (next > current) // underflow |
| 65 | + throw new Error("Permit count underflow"); |
| 66 | + // CAS更新许可数 |
| 67 | + if (compareAndSetState(current, next)) |
| 68 | + return; |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + /* 丢弃所有许可 */ |
| 73 | + final int drainPermits() { |
| 74 | + for (;;) { |
| 75 | + int current = getState(); |
| 76 | + if (current == 0 || compareAndSetState(current, 0)) |
| 77 | + return current; |
| 78 | + } |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +/** |
| 83 | + * 非公平模式 |
| 84 | + */ |
| 85 | +static final class NonfairSync extends Sync { |
| 86 | + private static final long serialVersionUID = -2694183684443567898L; |
| 87 | + |
| 88 | + NonfairSync(int permits) { |
| 89 | + super(permits); |
| 90 | + } |
| 91 | + |
| 92 | + protected int tryAcquireShared(int acquires) { |
| 93 | + return nonfairTryAcquireShared(acquires); |
| 94 | + } |
| 95 | +} |
| 96 | + |
| 97 | +/** |
| 98 | + * 公平模式 |
| 99 | + */ |
| 100 | +static final class FairSync extends Sync { |
| 101 | + private static final long serialVersionUID = 2014338818796000944L; |
| 102 | + |
| 103 | + FairSync(int permits) { |
| 104 | + super(permits); |
| 105 | + } |
| 106 | + |
| 107 | + /** |
| 108 | + * 公平模式获取许可 |
| 109 | + * 公平模式不论许可是否充足,都会判断同步队列中是否有线程在等地,如果有,获取失败,排队阻塞 |
| 110 | + */ |
| 111 | + protected int tryAcquireShared(int acquires) { |
| 112 | + for (;;) { |
| 113 | + // 如果有线程在排队,立即返回 |
| 114 | + if (hasQueuedPredecessors()) |
| 115 | + return -1; |
| 116 | + // 自旋 + cas获取许可 |
| 117 | + int available = getState(); |
| 118 | + int remaining = available - acquires; |
| 119 | + if (remaining < 0 || |
| 120 | + compareAndSetState(available, remaining)) |
| 121 | + return remaining; |
| 122 | + } |
| 123 | + } |
| 124 | +} |
| 125 | +``` |
| 126 | + |
| 127 | +#### 主要API |
| 128 | +```java |
| 129 | +public class Semaphore implements java.io.Serializable { |
| 130 | + |
| 131 | + private static final long serialVersionUID = -3222578661600680210L; |
| 132 | + |
| 133 | + /** All mechanics via AbstractQueuedSynchronizer subclass */ |
| 134 | + private final Sync sync; |
| 135 | + |
| 136 | + /** |
| 137 | + * 根据给定的 总许可数permits,创建 Semaphore |
| 138 | + */ |
| 139 | + public Semaphore(int permits) { |
| 140 | + sync = new NonfairSync(permits); |
| 141 | + } |
| 142 | + |
| 143 | + /** |
| 144 | + * fair为true表示使用公平锁模式,false使用非公平锁 |
| 145 | + */ |
| 146 | + public Semaphore(int permits, boolean fair) { |
| 147 | + sync = fair ? new FairSync(permits) : new NonfairSync(permits); |
| 148 | + } |
| 149 | + |
| 150 | + // --------------------- 获取许可 -------------------- |
| 151 | + |
| 152 | + /* 获取指定数量的许可 */ |
| 153 | + public void acquire(int permits) throws InterruptedException { |
| 154 | + if (permits < 0) throw new IllegalArgumentException(); |
| 155 | + sync.acquireSharedInterruptibly(permits); |
| 156 | + } |
| 157 | + |
| 158 | + /* 获取一个许可 */ |
| 159 | + public void acquire() throws InterruptedException { |
| 160 | + sync.acquireSharedInterruptibly(1); |
| 161 | + } |
| 162 | + |
| 163 | + public final void acquireSharedInterruptibly(int arg) |
| 164 | + throws InterruptedException { |
| 165 | + if (Thread.interrupted()) |
| 166 | + throw new InterruptedException(); |
| 167 | + if (tryAcquireShared(arg) < 0) // 获取许可,剩余许可>=0,则获取许可成功,<0获取许可失败,进入排队 |
| 168 | + doAcquireSharedInterruptibly(arg); |
| 169 | + } |
| 170 | + |
| 171 | + protected int tryAcquireShared(int acquires) { |
| 172 | + return nonfairTryAcquireShared(acquires); |
| 173 | + } |
| 174 | + |
| 175 | + /** |
| 176 | + * @return 剩余许可数量。非负数,获取许可成功,负数,获取许可失败 |
| 177 | + */ |
| 178 | + final int nonfairTryAcquireShared(int acquires) { |
| 179 | + for (;;) { |
| 180 | + int available = getState(); |
| 181 | + int remaining = available - acquires; |
| 182 | + if (remaining < 0 || |
| 183 | + compareAndSetState(available, remaining)) |
| 184 | + return remaining; |
| 185 | + } |
| 186 | + } |
| 187 | + |
| 188 | + /** |
| 189 | + * 获取许可失败,当前线程进入同步队列,排队阻塞 |
| 190 | + */ |
| 191 | + private void doAcquireSharedInterruptibly(int arg) |
| 192 | + throws InterruptedException { |
| 193 | + // 创建同步队列节点,并入队 |
| 194 | + final Node node = addWaiter(Node.SHARED); |
| 195 | + boolean failed = true; |
| 196 | + try { |
| 197 | + for (;;) { |
| 198 | + // 如果当前节点是第二个节点,尝试获取锁 |
| 199 | + final Node p = node.predecessor(); |
| 200 | + if (p == head) { |
| 201 | + int r = tryAcquireShared(arg); |
| 202 | + if (r >= 0) { |
| 203 | + setHeadAndPropagate(node, r); |
| 204 | + p.next = null; // help GC |
| 205 | + failed = false; |
| 206 | + return; |
| 207 | + } |
| 208 | + } |
| 209 | + // 阻塞当前线程 |
| 210 | + if (shouldParkAfterFailedAcquire(p, node) && |
| 211 | + parkAndCheckInterrupt()) |
| 212 | + throw new InterruptedException(); |
| 213 | + } |
| 214 | + } finally { |
| 215 | + if (failed) |
| 216 | + cancelAcquire(node); |
| 217 | + } |
| 218 | + } |
| 219 | + |
| 220 | + // --------------------- 释放归还许可 ------------------------- |
| 221 | + |
| 222 | + /* 释放指定数量的许可 */ |
| 223 | + public void release(int permits) { |
| 224 | + if (permits < 0) throw new IllegalArgumentException(); |
| 225 | + sync.releaseShared(permits); |
| 226 | + } |
| 227 | + |
| 228 | + /* 释放一个许可 */ |
| 229 | + public void release() { |
| 230 | + sync.releaseShared(1); |
| 231 | + } |
| 232 | + |
| 233 | + public final boolean releaseShared(int arg) { |
| 234 | + // 归还许可成功 |
| 235 | + if (tryReleaseShared(arg)) { |
| 236 | + doReleaseShared(); |
| 237 | + return true; |
| 238 | + } |
| 239 | + return false; |
| 240 | + } |
| 241 | + |
| 242 | + /** |
| 243 | + * 释放许可 |
| 244 | + * 由于未对释放许可数做限制,所以可以通过release动态增加许可数量 |
| 245 | + */ |
| 246 | + protected final boolean tryReleaseShared(int releases) { |
| 247 | + for (;;) { |
| 248 | + int current = getState(); |
| 249 | + int next = current + releases; |
| 250 | + if (next < current) // overflow |
| 251 | + throw new Error("Maximum permit count exceeded"); |
| 252 | + if (compareAndSetState(current, next)) |
| 253 | + return true; |
| 254 | + } |
| 255 | + } |
| 256 | + |
| 257 | + private void doReleaseShared() { |
| 258 | + // 自旋,唤醒等待的第一个线程(其他线程将由第一个线程向后传递唤醒) |
| 259 | + for (;;) { |
| 260 | + Node h = head; |
| 261 | + if (h != null && h != tail) { |
| 262 | + int ws = h.waitStatus; |
| 263 | + if (ws == Node.SIGNAL) { |
| 264 | + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) |
| 265 | + continue; // loop to recheck cases |
| 266 | + // 唤醒第一个等待线程 |
| 267 | + unparkSuccessor(h); |
| 268 | + } |
| 269 | + else if (ws == 0 && |
| 270 | + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) |
| 271 | + continue; // loop on failed CAS |
| 272 | + } |
| 273 | + if (h == head) // loop if head changed |
| 274 | + break; |
| 275 | + } |
| 276 | + } |
| 277 | +} |
| 278 | +``` |
0 commit comments