Skip to content

Latest commit

 

History

History
executable file
·
216 lines (204 loc) · 8.28 KB

ConcurrentLinkedQueue.md

File metadata and controls

executable file
·
216 lines (204 loc) · 8.28 KB

ConcurrentLinkedQueue

  • ** 非阻塞**的链表队列
  • 通过CAS实现无锁非阻塞

基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。

Compare and swap(CAS)

  • CAS是一种替代锁存在的无锁并发机制。
  • 该种无锁机制也是乐观锁的体现。
  • Why need CAS:一个修改操作可以分为:从内存中读取值->修改值->写回内存,在高并发的过程中,这一个过程失去了其原子性,所以修改值的原始值可能已经被其他线程修改了。 i != i 这种操作在单线程中一定是true,在多线程中i的值可能在别的线程被更改。
  • 如何实现CAS:在循环中读取这次循环的内存中存储值,进行修改,在写回内存中时比较内存中的值是否还是第一步取出的值,不是的话说明这次循环失败,重新开始,是的话写入新的值。
private static final Unsafe unsafe = Unsafe.getUnsafe(); //JNI实现的Unsafe类。
public final boolean compareAndSet(long expect, long update) {
	return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

Node

  • ConcurrentLinkedQueue的内部私有类
  • 使用了通过C++编写的Unsafe类
   private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
		//通过CAS保证了操作的原子性。
		//CAS应用:表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
		//CAS 下一个结点
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

ConcurrentLinkedQueue's API

插入元素

  • add() 通过offer方法添加元素。
    public boolean add(E e) {
        return offer(e);
    }
  • offer() clq
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // 当尾结点就是指向链表的最后一个元素,尝试CAS新的结点
                if (p.casNext(null, newNode)) {	//将新的结点添加到链表的底部
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // 如上图所示,如果当前p不是tail结点,就将新接入的结点当做尾结点。相当于尾结点一次性跳跃两个结点。
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)	//哨兵结点情况
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
				//如果在读取中,t的值被更新了,“打赌”t被更新正确了,不然重新从头开始。
                p = (t != (t = tail)) ? t : head;
            else	//q不是最后一个结点
                // Check for tail updates after two hops.
				//如果在该操作过程中,t的值被更新了,我们“打赌”t的值被更新正确
				//不然我们将p指针向后移动一位,重新进入循环
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

高效读写的队列:深度剖析ConcurrentLinkedQueue

出队列

cdl

  • poll() 出队列不会阻塞,如果队列为空则返回null。
    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
				//取出头元素的值,并将其CAS成null。
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
					//"打赌",假设头结点head已经被更新,则沿用h,不然更新为下一个结点
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
				//当前结点为空,下一个结点也为空,队列为空,退出循环,返回null。
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
				//遇到哨兵结点,从新从头开始。
                else if (p == q)
                    continue restartFromHead;
				//此时q == p.next, 将指针向右移动一位。
                else
                    p = q;
            }
        }
    }

其他方法

  • size() 检查队列大小,O(n),为了保证并发性不得不每次都遍历链表。
    public int size() {	//返回检查队列的大小
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
  • isEmpty() 判断链表非空,O(1)级别

Test

  • 生产者
public class ConcurrentLinkQueueProducer implements Runnable{
	private ConcurrentLinkedQueue<Integer> q;
	public ConcurrentLinkQueueProducer(ConcurrentLinkedQueue<Integer> q) {
		super();
		this.q = q;
	}
	@Override
	public void run() {
		AtomicInteger al = new AtomicInteger(0);//通过AtomicInteger保证原子性,避免锁。
		while(true){
			q.offer(al.get());
			System.out.println("Producer: put " + al.getAndIncrement() + " into queue...");
			try {
				Thread.sleep(10);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
  • 消费者
public class ConcurrentLinkedQueueConsumer implements Runnable {
	private ConcurrentLinkedQueue<Integer> q;
	public ConcurrentLinkedQueueConsumer(ConcurrentLinkedQueue<Integer> q) {
		super();
		this.q = q;
	}
	@Override
	public void run() {
		try {
			while(true){
				System.out.println("Consumer: get " + q.poll() + " from queue...");
				Thread.sleep(10);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
		Thread producer = new Thread(new ConcurrentLinkQueueProducer(q));
		producer.start();
		producer.join(100);
		new Thread(new ConcurrentLinkedQueueConsumer(q)).start();
	}
}