In [1]:
# 轻量级的消息队列模块queue，主要用于多生产者和消费者模式下的队列实现，特别适合多线程时的消息交换
# 实现了常见的锁语法，临时阻塞线程，防止竞争，有赖于对线程的支持。

In [2]:
# queue模块实现了三种队列：
'''
FIFO：先进先出队列，类似管道。元素只能从队头方向一个一个的弹出，只能从队尾一个一个的放入
LIFO：后进先出队列，也就是栈,元素永远只能在栈顶出入
priority queue：优先级队列，每个元素都带有一个优先值，值越小的越早出去。值相同的，先进入队列的先出去
'''
# queue模块定义了下面几个类和异常:
'''
class queue.Queue(maxsize=0) FIFO队列构造器。maxsize是队列里最多能同时存在的元素个数
class queue.LifoQueue(maxsize=0) LIFO队列构造器。maxsize是队列里最多能同时放置的元素个数
class queue.PriorityQueue(maxsize=0) 优先级队列构造器。maxsize是队列里最多能同时放置的元素个数
exception queue.Empty  从空的队列里请求元素的时候，弹出该异常
exception queue.Full   往满的队列里放入元素的时候，弹出该异常
'''

# Queue对象
'''
Queue.qsize()
返回当前队列内的元素的个数。注意，qsize()大于零不等于下一个get()方法一定不会被阻塞，qsize()小于maxsize也不表示下一个put()方法一定不会被阻塞。

Queue.empty()
队列为空则返回True，否则返回False。同样地，返回True不表示下一个put()方法一定不会被阻塞。返回False不表示下一个get()一定不会被阻塞。

Queue.full()
与empty()方法正好相反。同样不保证下一步的操作不被阻塞。

Queue.put(item, block=True, timeout=None)
item参数表示具体要放入队列的元素。block和timeout两个参数配合使用。其中，如果block=True，timeout=None，队列阻塞，直到有空槽出现；当block=True，timeout=正整数N，如果在等待了N秒后，队列还没有空槽，则弹出Full异常；如果block=False，则timeout参数被忽略，队列有空槽则立即放入，如果没空槽，则弹出Full异常。

Queue.put_nowait(item)
等同于put(item, False)

Queue.get(block=True, timeout=None)
从队列内删除并返回一个元素。如果block=True, timeout=None，队列会阻塞，直到有可供弹出的元素。如果timeout指定为一个正整数N，则在N秒内如果队列内没有可供弹出的元素，则抛出Empty异常。如果block=False，timeout参数会被忽略，此时队列内如果有元素则直接弹出，无元素可弹，则抛出Empty异常。

Queue.get_nowait()
等同于get(False).

# 下面的两个方法用于跟踪排队的任务是否被消费者守护线程完全处理。
Queue.task_done()
表明先前的队列任务已完成。由消费者线程使用。

Queue.join()
阻塞队列，直到队列内的所有元素被获取和处理。
当有元素进入队列时未完成任务的计数将增加。每当有消费者线程调用task_done()方法表示一个任务被完成时，未完成任务的计数将减少。当该计数变成0的时候，join()方法将不再阻塞。
'''

'\nFIFO：先进先出队列，类似管道。元素只能从队头方向一个一个的弹出，只能从队尾一个一个的放入\nLIFO：后进先出队列，也就是栈,元素永远只能在栈顶出入\npriority queue：优先级队列，每个元素都带有一个优先值，值越小的越早出去。值相同的，先进入队列的先出去\n'

In [3]:
import queue

q = queue.Queue(5)   # FIFO模式
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get())  # 阻塞了


1
2
3


In [5]:
q = queue.Queue(5)
print(q.maxsize)
print(q.qsize())
print(q.empty())
print(q.full())
q.put('123')
q.put(123)
q.put(['1', '2'])
q.put({'name': 'curry'})
q.put(2.34)
# q.put(None) # 阻塞了


5
0
True
False


In [7]:
import queue
q = queue.LifoQueue(5)  # LIFO模式
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

3
2
1


In [12]:
import queue

q = queue.PriorityQueue()
q.put((3, 'haha'))
q.put((2, 'dana'))
q.put((1, 'curry'))
print(q.get())
print(q.get())
print(q)
q.put((4, 'xixi'))
print(q.get())
print(q.qsize())

(1, 'curry')
(2, 'dana')
<queue.PriorityQueue object at 0x7fad9041f828>
(3, 'haha')
1


In [18]:
# 等待排队任务

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import queue 
import time
import threading

def worker(i):
    while True:
        item = q.get()
        if item is None:
            print('线程%s发现了一个None，可以休息了^-^' % i)
            break
        time.sleep(0.5)  # do_work(item)做具体的工作
        print('线程%s已经完成任务%s了' % (i, item))
        q.task_done()  # 做完后发出任务完成信号，然后继续下一个任务

if __name__ == '__main__':
    num_of_thread = 5
    source = [i for i in range(1, 21)]  # 模拟20个任务
    q = queue.Queue()  # 创建一个FIFO队列对象，不设置上限
    threads = []  # 创建一个线程池
     # 创建指定个数的工作线程，并把他们放到线程池threads中
    for i in range(1, num_of_thread+1):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()
    # 将任务源里的任务逐个放入队列
    for item in source:
        time.sleep(0.5)  # 每隔0.5秒发布一个新任务
        q.put(item)
    
    q.join()  # 阻塞队列直到队列里的任务都完成了
    print('-----工作完成啦!-----')
     # 停止工作线程
    for i in range(num_of_thread):
        q.put(None)
    for t in threads:
        t.join()
    print(threads)
    

线程1已经完成任务1了
线程2已经完成任务2了
线程3已经完成任务3了
线程4已经完成任务4了
线程4已经完成任务5了
线程1已经完成任务6了
线程2已经完成任务7了
线程3已经完成任务8了
线程5已经完成任务9了
线程4已经完成任务10了
线程1已经完成任务11了
线程2已经完成任务12了
线程3已经完成任务13了
线程5已经完成任务14了
线程4已经完成任务15了
线程1已经完成任务16了
线程2已经完成任务17了
线程3已经完成任务18了
线程5已经完成任务19了
线程4已经完成任务20了
-----工作完成啦!-----
线程4发现了一个None，可以休息了^-^线程2发现了一个None，可以休息了^-^线程5发现了一个None，可以休息了^-^线程1发现了一个None，可以休息了^-^线程3发现了一个None，可以休息了^-^[<Thread(Thread-9, stopped 140382918653696)>, <Thread(Thread-10, stopped 140382910260992)>, <Thread(Thread-11, stopped 140382901868288)>, <Thread(Thread-12, stopped 140382348244736)>, <Thread(Thread-13, stopped 140382339852032)>]





