|
3 | 3 | * 通过CAS实现无锁非阻塞 |
4 | 4 | >基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。 |
5 | 5 |
|
| 6 | +## Compare and swap(CAS) |
| 7 | +* CAS是一种替代锁存在的无锁并发机制。 |
| 8 | +* 该种无锁机制也是乐观锁的体现。 |
| 9 | +* Why need CAS:一个修改操作可以分为:从内存中读取值->修改值->写回内存,在高并发的过程中,这一个过程失去了其原子性,所以修改值的原始值可能已经被其他线程修改了。 |
| 10 | + i != i |
| 11 | +这种操作在单线程中一定是true,在多线程中i的值可能在别的线程被更改。 |
| 12 | +* 如何实现CAS:在循环中读取这次循环的内存中存储值,进行修改,在写回内存中时比较内存中的值是否还是第一步取出的值,不是的话说明这次循环失败,重新开始,是的话写入新的值。 |
| 13 | +```Java |
| 14 | +private static final Unsafe unsafe = Unsafe.getUnsafe(); //JNI实现的Unsafe类。 |
| 15 | +public final boolean compareAndSet(long expect, long update) { |
| 16 | + return unsafe.compareAndSwapLong(this, valueOffset, expect, update); |
| 17 | +} |
| 18 | +``` |
| 19 | + |
6 | 20 | ## Node |
7 | 21 | * ConcurrentLinkedQueue的内部私有类 |
8 | 22 | * 使用了通过C++编写的Unsafe类 |
|
17 | 31 | Node(E item) { |
18 | 32 | UNSAFE.putObject(this, itemOffset, item); |
19 | 33 | } |
| 34 | + //通过CAS保证了操作的原子性。 |
| 35 | + //CAS应用:表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。 |
20 | 36 | boolean casItem(E cmp, E val) { |
21 | 37 | return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); |
22 | 38 | } |
23 | 39 | void lazySetNext(Node<E> val) { |
24 | 40 | UNSAFE.putOrderedObject(this, nextOffset, val); |
25 | 41 | } |
| 42 | + //CAS 下一个结点 |
26 | 43 | boolean casNext(Node<E> cmp, Node<E> val) { |
27 | 44 | return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); |
28 | 45 | } |
|
46 | 63 | ``` |
47 | 64 |
|
48 | 65 | ## ConcurrentLinkedQueue's API |
| 66 | +### 插入元素 |
49 | 67 | * add() 通过offer方法添加元素。 |
50 | 68 | ```Java |
51 | 69 | public boolean add(E e) { |
|
54 | 72 | ``` |
55 | 73 |
|
56 | 74 | * offer() |
| 75 | + |
| 76 | + |
57 | 77 | ```Java |
58 | 78 | public boolean offer(E e) { |
59 | 79 | checkNotNull(e); |
|
62 | 82 | for (Node<E> t = tail, p = t;;) { |
63 | 83 | Node<E> q = p.next; |
64 | 84 | if (q == null) { |
65 | | - // p is last node |
66 | | - if (p.casNext(null, newNode)) { |
| 85 | + // 当尾结点就是指向链表的最后一个元素,尝试CAS新的结点 |
| 86 | + if (p.casNext(null, newNode)) { //将新的结点添加到链表的底部 |
67 | 87 | // Successful CAS is the linearization point |
68 | 88 | // for e to become an element of this queue, |
69 | 89 | // and for newNode to become "live". |
70 | | - if (p != t) // hop two nodes at a time |
| 90 | + if (p != t) // 如上图所示,如果当前p不是tail结点,就将新接入的结点当做尾结点。相当于尾结点一次性跳跃两个结点。 |
71 | 91 | casTail(t, newNode); // Failure is OK. |
72 | 92 | return true; |
73 | 93 | } |
74 | 94 | // Lost CAS race to another thread; re-read next |
75 | 95 | } |
76 | | - else if (p == q) |
| 96 | + else if (p == q) //哨兵结点情况 |
77 | 97 | // We have fallen off list. If tail is unchanged, it |
78 | 98 | // will also be off-list, in which case we need to |
79 | 99 | // jump to head, from which all live nodes are always |
80 | 100 | // reachable. Else the new tail is a better bet. |
| 101 | + //如果在读取中,t的值被更新了,“打赌”t被更新正确了,不然重新从头开始。 |
81 | 102 | p = (t != (t = tail)) ? t : head; |
82 | | - else |
| 103 | + else //q不是最后一个结点 |
83 | 104 | // Check for tail updates after two hops. |
| 105 | + //如果在该操作过程中,t的值被更新了,我们“打赌”t的值被更新正确 |
| 106 | + //不然我们将p指针向后移动一位,重新进入循环 |
84 | 107 | p = (p != t && t != (t = tail)) ? t : q; |
85 | 108 | } |
86 | 109 | } |
| 110 | +``` |
| 111 | +[高效读写的队列:深度剖析ConcurrentLinkedQueue](https://blog.csdn.net/chenguibao/article/details/51773322) |
| 112 | + |
| 113 | +### 出队列 |
| 114 | + |
| 115 | +* poll() |
| 116 | +```Java |
| 117 | + public E poll() { |
| 118 | + restartFromHead: |
| 119 | + for (;;) { |
| 120 | + for (Node<E> h = head, p = h, q;;) { |
| 121 | + E item = p.item; |
| 122 | + //取出头元素的值,并将其CAS成null。 |
| 123 | + if (item != null && p.casItem(item, null)) { |
| 124 | + // Successful CAS is the linearization point |
| 125 | + // for item to be removed from this queue. |
| 126 | + //"打赌",假设头结点head已经被更新,则沿用h,不然更新为下一个结点 |
| 127 | + if (p != h) // hop two nodes at a time |
| 128 | + updateHead(h, ((q = p.next) != null) ? q : p); |
| 129 | + return item; |
| 130 | + } |
| 131 | + //当前结点为空,下一个结点也为空,队列为空,退出循环,返回null。 |
| 132 | + else if ((q = p.next) == null) { |
| 133 | + updateHead(h, p); |
| 134 | + return null; |
| 135 | + } |
| 136 | + //遇到哨兵结点,从新从头开始。 |
| 137 | + else if (p == q) |
| 138 | + continue restartFromHead; |
| 139 | + //此时q == p.next, 将指针向右移动一位。 |
| 140 | + else |
| 141 | + p = q; |
| 142 | + } |
| 143 | + } |
| 144 | + } |
87 | 145 | ``` |
0 commit comments