Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingQueue #12

Open
Jimmy2Angel opened this issue Apr 17, 2018 · 0 comments
Open

BlockingQueue #12

Jimmy2Angel opened this issue Apr 17, 2018 · 0 comments

Comments

@Jimmy2Angel
Copy link
Owner

首先先介绍一下 Queue、AbstractQueue等接口和类。

Queue

该接口针对添加元素、移除元素、获取第一个元素各有两个方法:

  1. 添加元素:
    • add(E e) 当队列为满时前者会抛出 IllegalStateException
    • offer(E e) 当队列为满时前者会返回 false
  2. 移除元素
    • remove(E e) 当队列为空时前者会抛出 NoSuchElementException
    • poll(E e) 当队列为空时前者会返回 null
  3. 获取第一个元素:
    • element( ) 当队列为空时前者会抛出 NoSuchElementException
    • peek( ) 当队列为空时前者会返回 null

BlockingQueue

该接口实现了 Queue 接口,针对添加元素、移除元素各有两个方法

  1. 添加元素:

    一直等待:

    void put(E e) throws InterruptedException;

    超时等待:

    boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
  2. 移除元素:

    一直等待:

    E take() throws InterruptedException;

    超时等待:

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

AbstractQueue

add(E e)

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

remove()

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

element()

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

ArrayBlockingQueue

简介

继承体系为:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

拥有的变量如下:

/** The queued items */
final Object[] items;
 
/** items index for next take, poll, peek or remove */
int takeIndex;
 
/** items index for next put, offer, or add */
int putIndex;
 
/** Number of elements in the queue */
int count;
 
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
 
/** Main lock guarding all access */
final ReentrantLock lock;
 
/** Condition for waiting takes */
private final Condition notEmpty;
 
/** Condition for waiting puts */
private final Condition notFull;
 
/**
* Shared state for currently active iterators, or null if there
* are known not to be any.  Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;

线程安全实现

线程安全主要是通过可重入锁 ReentrantLock 来实现的。

add(E e)

public boolean add(E e) {
    return super.add(e);
}

offer(E e)

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 如果队列已满返回 false
        if (count == items.length)
            return false;
        else {
            // 添加元素
            enqueue(e);
            return true;
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

enqueue(E e)

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 类似于notify()
    notEmpty.signal();
}

poll()

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

dequeue()

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

阻塞

阻塞是基于 Condition 接口实现的。Condition 拥有类似的操作:await/signal。Condition 和一个 Lock 相关,由lock.newCondition() 来创建。只有当前线程获取了这把锁,才能调用 Condition 的 await 方法来等待通知,否则会抛出异常。

put(E e)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 如果队列已满,则阻塞等待队列移除元素
            notFull.await();
        // 该方法中包含移除元素发生阻塞时需要 notEmpty.signal()
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
  • 实现阻塞的关键就是就是这个 notFull 的 Condition,当队列已满,await 方法会阻塞当前线程,并且释放Lock,等待其他线程调用 notFull 的 signal 来唤醒这个阻塞的线程。那么这个操作必然会在拿走元素的操作中出现,这样一旦有元素被拿走,阻塞的线程就会被唤醒。
  • 发出 signal 的线程肯定拥有这把锁的,因此 await 方法所在的线程肯定是拿不到这把锁的,await 方法不能立刻返回,需要尝试获取锁直到拥有了锁才可以从 await 方法中返回。

take()

同样对于take方法会有一个 notEmpty 的 Condition。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果队列为空,则阻塞等待队列添加一个元素
            notEmpty.await();
        // 该方法中包含移除元素发生阻塞时需要 notFull.signal()
        return dequeue();
    } finally {
        lock.unlock();
    }
}

总结

  • 线程安全是基于可重入锁 ReentrantLock 实现的。
  • 阻塞是基于 Condition 的 await/signal 实现的。

LinkedBlockingQueue

简介

LinkedBlockingQueue 是使用一个链表来实现,拥有一个内部类 Node 其中 Node 结构如下:

/**
* Linked list node class
*/
static class Node<E> {
    E item;
     
    /**
    * One of:
    * - the real successor Node
    * - this Node, meaning the successor is head.next
    * - null, meaning there is no successor (this is the last node)
    */
    Node<E> next;
     
    Node(E x) { item = x; }
}

拥有成员变量如下,其中包括一个 head 节点(item 为 null,next 节点才是first 节点)和一个 last 节点(next 节点为 null),然后还包括两个锁:takeLock 和 putLock

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
 
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
 
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
 
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
 
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
 
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
 
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
 
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

方法解析

offer(E e)

源码如下,另外还有一个重载方法 offer(E e, long timeout, TimeUnit unit),在队列已满的时候,进行超时阻塞。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列已满则返回 false
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 根据待添加元素 e 创建一个 node 节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            // 将 last 节点的 next 设置为 node 节点 ,然后将 node 节点设置为 last 节点
            enqueue(node);
            c = count.getAndIncrement();
            // 如果添加该元素后队列还未满,则调用 notFull.signal() 方法,即唤醒添加元素时由于队列已满导致阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
    // 一个获取 takeLock、notEmpty.signal()、释放 takeLock 操作过程。也就是通知其他线程可以移除元素了。
        signalNotEmpty();
    return c >= 0;
}

enqueue(Node e)

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

poll()

和 offer(E e) 方法类似,除此之外也提供了一个重载方法 poll(long timeout, TimeUnit unit),在队列为空的时候,进行超时阻塞。

public E poll() {
    final AtomicInteger count = this.count;
    // 队列为空则返回 null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            // 获取 first 节点的 item,然后设置为 head 节点(first.item = null)
            x = dequeue();
            c = count.getAndDecrement();
            // 如果移除元素后队列不为空,则唤醒移除元素时由于队列为空导致阻塞的线程
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 一个获取 putLock、notFull.signal()、释放 putLock 操作过程。也就是通知其他线程可以添加元素了。
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue()

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

put(E e)

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
        * Note that count is used in wait guard even though it is
        * not protected by lock. This works because count can
        * only decrease at this point (all other puts are shut
        * out by lock), and we (or some other waiting put) are
        * signalled if it ever changes from capacity. Similarly
        * for all other uses of count in other wait guards.
        */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

remove(Object o)

因为需要操作整个链表,因此需要同时拥有两个锁才能操作。

public boolean remove(Object o) {
    if (o == null) return false;
    // putLock.lock(); takeLock.lock();
    fullyLock();
    try {
        // 遍历查找
        for (Node<E> trail = head, p = trail.next;
            p != null;
            trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

contains(Object o)

因为需要操作整个链表,因此需要同时拥有两个锁才能操作。

public boolean contains(Object o) {
    if (o == null) return false;
    // putLock.lock(); takeLock.lock();
    fullyLock();
    try {
        // 遍历查找
        for (Node<E> p = head.next; p != null; p = p.next)
        if (o.equals(p.item))
            return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

总结

  • 底层是基于链表实现的。
  • 拥有两个锁:putLock 和 takeLock。分别对应添加和移除元素操作。
  • 与 putLock 和 takeLock 相对应的两个 condition 分别为 notFull 和 notEmpty。
  • offer(E e) 方法在队列已满的时候返回 false,put(E e) 方法在队列已满的时候阻塞。
  • poll() 方法在队列为空的时候返回 null,take() 方法在队列为空的时候阻塞。
  • remove(Object o) 和 contains(Object o) ,因为需要操作整个链表,因此需要同时拥有两个锁才能操作。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant